blob: 042879dce7020fdd4ff83116552e014a6c440262 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Antoine Pitrou3ebaed62010-08-21 19:17:25 +000033import signal
34import errno
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000046
47__metaclass__ = type
48bytes = support.py3k_bytes
49
50def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
52 with io.open(__file__, "r", encoding="latin1") as f:
53 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55
Antoine Pitrou6391b342010-09-14 18:48:19 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000063 self._reads = 0
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000064 self._extraneous_reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000065
Christian Heimes1a6387e2008-03-26 12:49:49 +000066 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitroucb4f47c2010-08-11 13:40:17 +000094 self._extraneous_reads += 1
Antoine Pitrou19690592009-06-12 20:14:08 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou6391b342010-09-14 18:48:19 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Antoine Pitrou19690592009-06-12 20:14:08 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000134
135
Antoine Pitrou19690592009-06-12 20:14:08 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return MockRawIO.write(self, b) * 2
139
140 def read(self, n=None):
141 return MockRawIO.read(self, n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 MockRawIO.readinto(self, buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
177 def __init__(self, data):
178 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000179 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000180
181 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000182 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def readinto(self, b):
187 res = super(MockFileIO, self).readinto(b)
188 self.read_history.append(res)
189 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000190
Antoine Pitrou19690592009-06-12 20:14:08 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000193
Antoine Pitrou19690592009-06-12 20:14:08 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
198class MockNonBlockWriterIO:
199
200 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000202 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def pop_written(self):
205 s = b"".join(self._write_stack)
206 self._write_stack[:] = []
207 return s
208
209 def block_on(self, char):
210 """Block when a given char is encountered."""
211 self._blocker_char = char
212
213 def readable(self):
214 return True
215
216 def seekable(self):
217 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000218
219 def writable(self):
220 return True
221
Antoine Pitrou19690592009-06-12 20:14:08 +0000222 def write(self, b):
223 b = bytes(b)
224 n = -1
225 if self._blocker_char:
226 try:
227 n = b.index(self._blocker_char)
228 except ValueError:
229 pass
230 else:
231 self._blocker_char = None
232 self._write_stack.append(b[:n])
233 raise self.BlockingIOError(0, "test blocking", n)
234 self._write_stack.append(b)
235 return len(b)
236
237class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
238 BlockingIOError = io.BlockingIOError
239
240class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
241 BlockingIOError = pyio.BlockingIOError
242
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243
244class IOTest(unittest.TestCase):
245
Antoine Pitrou19690592009-06-12 20:14:08 +0000246 def setUp(self):
247 support.unlink(support.TESTFN)
248
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000250 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251
252 def write_ops(self, f):
253 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 f.truncate(0)
255 self.assertEqual(f.tell(), 5)
256 f.seek(0)
257
258 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.write(b"Hello."), 6)
261 self.assertEqual(f.tell(), 6)
262 self.assertEqual(f.seek(-1, 1), 5)
263 self.assertEqual(f.tell(), 5)
264 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.write(b"h"), 1)
267 self.assertEqual(f.seek(-1, 2), 13)
268 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000269
Christian Heimes1a6387e2008-03-26 12:49:49 +0000270 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000271 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
273
274 def read_ops(self, f, buffered=False):
275 data = f.read(5)
276 self.assertEqual(data, b"hello")
277 data = bytearray(data)
278 self.assertEqual(f.readinto(data), 5)
279 self.assertEqual(data, b" worl")
280 self.assertEqual(f.readinto(data), 2)
281 self.assertEqual(len(data), 5)
282 self.assertEqual(data[:2], b"d\n")
283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.read(20), b"hello world\n")
285 self.assertEqual(f.read(1), b"")
286 self.assertEqual(f.readinto(bytearray(b"x")), 0)
287 self.assertEqual(f.seek(-6, 2), 6)
288 self.assertEqual(f.read(5), b"world")
289 self.assertEqual(f.read(0), b"")
290 self.assertEqual(f.readinto(bytearray()), 0)
291 self.assertEqual(f.seek(-6, 1), 5)
292 self.assertEqual(f.read(5), b" worl")
293 self.assertEqual(f.tell(), 10)
294 self.assertRaises(TypeError, f.seek, 0.0)
295 if buffered:
296 f.seek(0)
297 self.assertEqual(f.read(), b"hello world\n")
298 f.seek(6)
299 self.assertEqual(f.read(), b"world\n")
300 self.assertEqual(f.read(), b"")
301
302 LARGE = 2**31
303
304 def large_file_ops(self, f):
305 assert f.readable()
306 assert f.writable()
307 self.assertEqual(f.seek(self.LARGE), self.LARGE)
308 self.assertEqual(f.tell(), self.LARGE)
309 self.assertEqual(f.write(b"xxx"), 3)
310 self.assertEqual(f.tell(), self.LARGE + 3)
311 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
312 self.assertEqual(f.truncate(), self.LARGE + 2)
313 self.assertEqual(f.tell(), self.LARGE + 2)
314 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
315 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000316 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
318 self.assertEqual(f.seek(-1, 2), self.LARGE)
319 self.assertEqual(f.read(2), b"x")
320
Antoine Pitrou19690592009-06-12 20:14:08 +0000321 def test_invalid_operations(self):
322 # Try writing on a file opened in read mode and vice-versa.
323 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000324 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000325 self.assertRaises(IOError, fp.read)
326 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000327 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 self.assertRaises(IOError, fp.write, b"blah")
329 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000330 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 self.assertRaises(IOError, fp.write, "blah")
332 self.assertRaises(IOError, fp.writelines, ["blah\n"])
333
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000335 with self.open(support.TESTFN, "wb", buffering=0) as f:
336 self.assertEqual(f.readable(), False)
337 self.assertEqual(f.writable(), True)
338 self.assertEqual(f.seekable(), True)
339 self.write_ops(f)
340 with self.open(support.TESTFN, "rb", buffering=0) as f:
341 self.assertEqual(f.readable(), True)
342 self.assertEqual(f.writable(), False)
343 self.assertEqual(f.seekable(), True)
344 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000345
346 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000347 with self.open(support.TESTFN, "wb") as f:
348 self.assertEqual(f.readable(), False)
349 self.assertEqual(f.writable(), True)
350 self.assertEqual(f.seekable(), True)
351 self.write_ops(f)
352 with self.open(support.TESTFN, "rb") as f:
353 self.assertEqual(f.readable(), True)
354 self.assertEqual(f.writable(), False)
355 self.assertEqual(f.seekable(), True)
356 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357
358 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 with self.open(support.TESTFN, "wb") as f:
360 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
361 with self.open(support.TESTFN, "rb") as f:
362 self.assertEqual(f.readline(), b"abc\n")
363 self.assertEqual(f.readline(10), b"def\n")
364 self.assertEqual(f.readline(2), b"xy")
365 self.assertEqual(f.readline(4), b"zzy\n")
366 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000367 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 self.assertRaises(TypeError, f.readline, 5.3)
369 with self.open(support.TESTFN, "r") as f:
370 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000371
372 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000373 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374 self.write_ops(f)
375 data = f.getvalue()
376 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378 self.read_ops(f, True)
379
380 def test_large_file_ops(self):
381 # On Windows and Mac OSX this test comsumes large resources; It takes
382 # a long time to build the >2GB file and takes >2GB of disk space
383 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000384 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
385 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 print("\nTesting large file ops skipped on %s." % sys.platform,
387 file=sys.stderr)
388 print("It requires %d bytes and a long time." % self.LARGE,
389 file=sys.stderr)
390 print("Use 'regrtest.py -u largefile test_io' to run it.",
391 file=sys.stderr)
392 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000393 with self.open(support.TESTFN, "w+b", 0) as f:
394 self.large_file_ops(f)
395 with self.open(support.TESTFN, "w+b") as f:
396 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000397
398 def test_with_open(self):
399 for bufsize in (0, 1, 100):
400 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000401 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000402 f.write(b"xxx")
403 self.assertEqual(f.closed, True)
404 f = None
405 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000406 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000407 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 except ZeroDivisionError:
409 self.assertEqual(f.closed, True)
410 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000411 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412
Antoine Pitroue741cc62009-01-21 00:45:36 +0000413 # issue 5008
414 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000415 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000416 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000418 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000420 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000423
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 def test_destructor(self):
425 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 def __del__(self):
428 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 try:
430 f = super(MyFileIO, self).__del__
431 except AttributeError:
432 pass
433 else:
434 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000435 def close(self):
436 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000437 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000438 def flush(self):
439 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000440 super(MyFileIO, self).flush()
441 f = MyFileIO(support.TESTFN, "wb")
442 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000443 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000444 support.gc_collect()
445 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000446 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000447 self.assertEqual(f.read(), b"xxx")
448
449 def _check_base_destructor(self, base):
450 record = []
451 class MyIO(base):
452 def __init__(self):
453 # This exercises the availability of attributes on object
454 # destruction.
455 # (in the C version, close() is called by the tp_dealloc
456 # function, not by __del__)
457 self.on_del = 1
458 self.on_close = 2
459 self.on_flush = 3
460 def __del__(self):
461 record.append(self.on_del)
462 try:
463 f = super(MyIO, self).__del__
464 except AttributeError:
465 pass
466 else:
467 f()
468 def close(self):
469 record.append(self.on_close)
470 super(MyIO, self).close()
471 def flush(self):
472 record.append(self.on_flush)
473 super(MyIO, self).flush()
474 f = MyIO()
475 del f
476 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000477 self.assertEqual(record, [1, 2, 3])
478
Antoine Pitrou19690592009-06-12 20:14:08 +0000479 def test_IOBase_destructor(self):
480 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
Antoine Pitrou19690592009-06-12 20:14:08 +0000482 def test_RawIOBase_destructor(self):
483 self._check_base_destructor(self.RawIOBase)
484
485 def test_BufferedIOBase_destructor(self):
486 self._check_base_destructor(self.BufferedIOBase)
487
488 def test_TextIOBase_destructor(self):
489 self._check_base_destructor(self.TextIOBase)
490
491 def test_close_flushes(self):
492 with self.open(support.TESTFN, "wb") as f:
493 f.write(b"xxx")
494 with self.open(support.TESTFN, "rb") as f:
495 self.assertEqual(f.read(), b"xxx")
496
497 def test_array_writes(self):
498 a = array.array(b'i', range(10))
499 n = len(a.tostring())
500 with self.open(support.TESTFN, "wb", 0) as f:
501 self.assertEqual(f.write(a), n)
502 with self.open(support.TESTFN, "wb") as f:
503 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000504
505 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000507 closefd=False)
508
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 def test_read_closed(self):
510 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 with self.open(support.TESTFN, "r") as f:
513 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000514 self.assertEqual(file.read(), "egg\n")
515 file.seek(0)
516 file.close()
517 self.assertRaises(ValueError, file.read)
518
519 def test_no_closefd_with_filename(self):
520 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000522
523 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000529 self.assertEqual(file.buffer.raw.closefd, False)
530
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 def test_garbage_collection(self):
532 # FileIO objects are collected, and collecting them flushes
533 # all data to disk.
534 f = self.FileIO(support.TESTFN, "wb")
535 f.write(b"abcxxx")
536 f.f = f
537 wr = weakref.ref(f)
538 del f
539 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000540 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544 def test_unbounded_file(self):
545 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
546 zero = "/dev/zero"
547 if not os.path.exists(zero):
548 self.skipTest("{0} does not exist".format(zero))
549 if sys.maxsize > 0x7FFFFFFF:
550 self.skipTest("test can only run in a 32-bit address space")
551 if support.real_max_memuse < support._2G:
552 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000553 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000554 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000555 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000556 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000557 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000558 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000559
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000560 def test_flush_error_on_close(self):
561 f = self.open(support.TESTFN, "wb", buffering=0)
562 def bad_flush():
563 raise IOError()
564 f.flush = bad_flush
565 self.assertRaises(IOError, f.close) # exception not swallowed
566
567 def test_multi_close(self):
568 f = self.open(support.TESTFN, "wb", buffering=0)
569 f.close()
570 f.close()
571 f.close()
572 self.assertRaises(ValueError, f.flush)
573
Antoine Pitrou6391b342010-09-14 18:48:19 +0000574 def test_RawIOBase_read(self):
575 # Exercise the default RawIOBase.read() implementation (which calls
576 # readinto() internally).
577 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
578 self.assertEqual(rawio.read(2), b"ab")
579 self.assertEqual(rawio.read(2), b"c")
580 self.assertEqual(rawio.read(2), b"d")
581 self.assertEqual(rawio.read(2), None)
582 self.assertEqual(rawio.read(2), b"ef")
583 self.assertEqual(rawio.read(2), b"g")
584 self.assertEqual(rawio.read(2), None)
585 self.assertEqual(rawio.read(2), b"")
586
Antoine Pitrou19690592009-06-12 20:14:08 +0000587class CIOTest(IOTest):
588 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000589
Antoine Pitrou19690592009-06-12 20:14:08 +0000590class PyIOTest(IOTest):
591 test_array_writes = unittest.skip(
592 "len(array.array) returns number of elements rather than bytelength"
593 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000594
595
Antoine Pitrou19690592009-06-12 20:14:08 +0000596class CommonBufferedTests:
597 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
598
599 def test_detach(self):
600 raw = self.MockRawIO()
601 buf = self.tp(raw)
602 self.assertIs(buf.detach(), raw)
603 self.assertRaises(ValueError, buf.detach)
604
605 def test_fileno(self):
606 rawio = self.MockRawIO()
607 bufio = self.tp(rawio)
608
Ezio Melotti2623a372010-11-21 13:34:58 +0000609 self.assertEqual(42, bufio.fileno())
Antoine Pitrou19690592009-06-12 20:14:08 +0000610
611 def test_no_fileno(self):
612 # XXX will we always have fileno() function? If so, kill
613 # this test. Else, write it.
614 pass
615
616 def test_invalid_args(self):
617 rawio = self.MockRawIO()
618 bufio = self.tp(rawio)
619 # Invalid whence
620 self.assertRaises(ValueError, bufio.seek, 0, -1)
621 self.assertRaises(ValueError, bufio.seek, 0, 3)
622
623 def test_override_destructor(self):
624 tp = self.tp
625 record = []
626 class MyBufferedIO(tp):
627 def __del__(self):
628 record.append(1)
629 try:
630 f = super(MyBufferedIO, self).__del__
631 except AttributeError:
632 pass
633 else:
634 f()
635 def close(self):
636 record.append(2)
637 super(MyBufferedIO, self).close()
638 def flush(self):
639 record.append(3)
640 super(MyBufferedIO, self).flush()
641 rawio = self.MockRawIO()
642 bufio = MyBufferedIO(rawio)
643 writable = bufio.writable()
644 del bufio
645 support.gc_collect()
646 if writable:
647 self.assertEqual(record, [1, 2, 3])
648 else:
649 self.assertEqual(record, [1, 2])
650
651 def test_context_manager(self):
652 # Test usability as a context manager
653 rawio = self.MockRawIO()
654 bufio = self.tp(rawio)
655 def _with():
656 with bufio:
657 pass
658 _with()
659 # bufio should now be closed, and using it a second time should raise
660 # a ValueError.
661 self.assertRaises(ValueError, _with)
662
663 def test_error_through_destructor(self):
664 # Test that the exception state is not modified by a destructor,
665 # even if close() fails.
666 rawio = self.CloseFailureIO()
667 def f():
668 self.tp(rawio).xyzzy
669 with support.captured_output("stderr") as s:
670 self.assertRaises(AttributeError, f)
671 s = s.getvalue().strip()
672 if s:
673 # The destructor *may* have printed an unraisable error, check it
674 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000675 self.assertTrue(s.startswith("Exception IOError: "), s)
676 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000677
678 def test_repr(self):
679 raw = self.MockRawIO()
680 b = self.tp(raw)
681 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
682 self.assertEqual(repr(b), "<%s>" % clsname)
683 raw.name = "dummy"
684 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
685 raw.name = b"dummy"
686 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000687
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000688 def test_flush_error_on_close(self):
689 raw = self.MockRawIO()
690 def bad_flush():
691 raise IOError()
692 raw.flush = bad_flush
693 b = self.tp(raw)
694 self.assertRaises(IOError, b.close) # exception not swallowed
695
696 def test_multi_close(self):
697 raw = self.MockRawIO()
698 b = self.tp(raw)
699 b.close()
700 b.close()
701 b.close()
702 self.assertRaises(ValueError, b.flush)
703
Christian Heimes1a6387e2008-03-26 12:49:49 +0000704
Antoine Pitrou19690592009-06-12 20:14:08 +0000705class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
706 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000707
Antoine Pitrou19690592009-06-12 20:14:08 +0000708 def test_constructor(self):
709 rawio = self.MockRawIO([b"abc"])
710 bufio = self.tp(rawio)
711 bufio.__init__(rawio)
712 bufio.__init__(rawio, buffer_size=1024)
713 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000714 self.assertEqual(b"abc", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
716 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
717 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
718 rawio = self.MockRawIO([b"abc"])
719 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000720 self.assertEqual(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000721
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000723 for arg in (None, 7):
724 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
725 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000726 self.assertEqual(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000727 # Invalid args
728 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729
Antoine Pitrou19690592009-06-12 20:14:08 +0000730 def test_read1(self):
731 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
732 bufio = self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000733 self.assertEqual(b"a", bufio.read(1))
734 self.assertEqual(b"b", bufio.read1(1))
735 self.assertEqual(rawio._reads, 1)
736 self.assertEqual(b"c", bufio.read1(100))
737 self.assertEqual(rawio._reads, 1)
738 self.assertEqual(b"d", bufio.read1(100))
739 self.assertEqual(rawio._reads, 2)
740 self.assertEqual(b"efg", bufio.read1(100))
741 self.assertEqual(rawio._reads, 3)
742 self.assertEqual(b"", bufio.read1(100))
743 self.assertEqual(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000744 # Invalid args
745 self.assertRaises(ValueError, bufio.read1, -1)
746
747 def test_readinto(self):
748 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
749 bufio = self.tp(rawio)
750 b = bytearray(2)
Ezio Melotti2623a372010-11-21 13:34:58 +0000751 self.assertEqual(bufio.readinto(b), 2)
752 self.assertEqual(b, b"ab")
753 self.assertEqual(bufio.readinto(b), 2)
754 self.assertEqual(b, b"cd")
755 self.assertEqual(bufio.readinto(b), 2)
756 self.assertEqual(b, b"ef")
757 self.assertEqual(bufio.readinto(b), 1)
758 self.assertEqual(b, b"gf")
759 self.assertEqual(bufio.readinto(b), 0)
760 self.assertEqual(b, b"gf")
Antoine Pitrou19690592009-06-12 20:14:08 +0000761
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000762 def test_readlines(self):
763 def bufio():
764 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
765 return self.tp(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000766 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
767 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
768 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000769
Antoine Pitrou19690592009-06-12 20:14:08 +0000770 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000771 data = b"abcdefghi"
772 dlen = len(data)
773
774 tests = [
775 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
776 [ 100, [ 3, 3, 3], [ dlen ] ],
777 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
778 ]
779
780 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000781 rawio = self.MockFileIO(data)
782 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000783 pos = 0
784 for nbytes in buf_read_sizes:
Ezio Melotti2623a372010-11-21 13:34:58 +0000785 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Christian Heimes1a6387e2008-03-26 12:49:49 +0000786 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000787 # this is mildly implementation-dependent
Ezio Melotti2623a372010-11-21 13:34:58 +0000788 self.assertEqual(rawio.read_history, raw_read_sizes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000789
Antoine Pitrou19690592009-06-12 20:14:08 +0000790 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000791 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000792 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
793 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000794
Ezio Melotti2623a372010-11-21 13:34:58 +0000795 self.assertEqual(b"abcd", bufio.read(6))
796 self.assertEqual(b"e", bufio.read(1))
797 self.assertEqual(b"fg", bufio.read())
798 self.assertEqual(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000799 self.assertTrue(None is bufio.read())
Ezio Melotti2623a372010-11-21 13:34:58 +0000800 self.assertEqual(b"", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000801
Antoine Pitrou19690592009-06-12 20:14:08 +0000802 def test_read_past_eof(self):
803 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
804 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000805
Ezio Melotti2623a372010-11-21 13:34:58 +0000806 self.assertEqual(b"abcdefg", bufio.read(9000))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000807
Antoine Pitrou19690592009-06-12 20:14:08 +0000808 def test_read_all(self):
809 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000811
Ezio Melotti2623a372010-11-21 13:34:58 +0000812 self.assertEqual(b"abcdefg", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000813
Victor Stinner6a102812010-04-27 23:55:59 +0000814 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +0000815 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +0000816 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000817 try:
818 # Write out many bytes with exactly the same number of 0's,
819 # 1's... 255's. This will help us check that concurrent reading
820 # doesn't duplicate or forget contents.
821 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000822 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000823 random.shuffle(l)
824 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000825 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000826 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000827 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000828 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000829 errors = []
830 results = []
831 def f():
832 try:
833 # Intra-buffer read then buffer-flushing read
834 for n in cycle([1, 19]):
835 s = bufio.read(n)
836 if not s:
837 break
838 # list.append() is atomic
839 results.append(s)
840 except Exception as e:
841 errors.append(e)
842 raise
843 threads = [threading.Thread(target=f) for x in range(20)]
844 for t in threads:
845 t.start()
846 time.sleep(0.02) # yield
847 for t in threads:
848 t.join()
849 self.assertFalse(errors,
850 "the following exceptions were caught: %r" % errors)
851 s = b''.join(results)
852 for i in range(256):
853 c = bytes(bytearray([i]))
854 self.assertEqual(s.count(c), N)
855 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000856 support.unlink(support.TESTFN)
857
858 def test_misbehaved_io(self):
859 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
860 bufio = self.tp(rawio)
861 self.assertRaises(IOError, bufio.seek, 0)
862 self.assertRaises(IOError, bufio.tell)
863
Antoine Pitroucb4f47c2010-08-11 13:40:17 +0000864 def test_no_extraneous_read(self):
865 # Issue #9550; when the raw IO object has satisfied the read request,
866 # we should not issue any additional reads, otherwise it may block
867 # (e.g. socket).
868 bufsize = 16
869 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
870 rawio = self.MockRawIO([b"x" * n])
871 bufio = self.tp(rawio, bufsize)
872 self.assertEqual(bufio.read(n), b"x" * n)
873 # Simple case: one raw read is enough to satisfy the request.
874 self.assertEqual(rawio._extraneous_reads, 0,
875 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
876 # A more complex case where two raw reads are needed to satisfy
877 # the request.
878 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
879 bufio = self.tp(rawio, bufsize)
880 self.assertEqual(bufio.read(n), b"x" * n)
881 self.assertEqual(rawio._extraneous_reads, 0,
882 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
883
884
Antoine Pitrou19690592009-06-12 20:14:08 +0000885class CBufferedReaderTest(BufferedReaderTest):
886 tp = io.BufferedReader
887
888 def test_constructor(self):
889 BufferedReaderTest.test_constructor(self)
890 # The allocation can succeed on 32-bit builds, e.g. with more
891 # than 2GB RAM and a 64-bit kernel.
892 if sys.maxsize > 0x7FFFFFFF:
893 rawio = self.MockRawIO()
894 bufio = self.tp(rawio)
895 self.assertRaises((OverflowError, MemoryError, ValueError),
896 bufio.__init__, rawio, sys.maxsize)
897
898 def test_initialization(self):
899 rawio = self.MockRawIO([b"abc"])
900 bufio = self.tp(rawio)
901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
902 self.assertRaises(ValueError, bufio.read)
903 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
904 self.assertRaises(ValueError, bufio.read)
905 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
906 self.assertRaises(ValueError, bufio.read)
907
908 def test_misbehaved_io_read(self):
909 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
910 bufio = self.tp(rawio)
911 # _pyio.BufferedReader seems to implement reading different, so that
912 # checking this is not so easy.
913 self.assertRaises(IOError, bufio.read, 10)
914
915 def test_garbage_collection(self):
916 # C BufferedReader objects are collected.
917 # The Python version has __del__, so it ends into gc.garbage instead
918 rawio = self.FileIO(support.TESTFN, "w+b")
919 f = self.tp(rawio)
920 f.f = f
921 wr = weakref.ref(f)
922 del f
923 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000924 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000925
926class PyBufferedReaderTest(BufferedReaderTest):
927 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000928
929
Antoine Pitrou19690592009-06-12 20:14:08 +0000930class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
931 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 def test_constructor(self):
934 rawio = self.MockRawIO()
935 bufio = self.tp(rawio)
936 bufio.__init__(rawio)
937 bufio.__init__(rawio, buffer_size=1024)
938 bufio.__init__(rawio, buffer_size=16)
Ezio Melotti2623a372010-11-21 13:34:58 +0000939 self.assertEqual(3, bufio.write(b"abc"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000940 bufio.flush()
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
942 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
943 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
944 bufio.__init__(rawio)
Ezio Melotti2623a372010-11-21 13:34:58 +0000945 self.assertEqual(3, bufio.write(b"ghi"))
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000947 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000948
Antoine Pitrou19690592009-06-12 20:14:08 +0000949 def test_detach_flush(self):
950 raw = self.MockRawIO()
951 buf = self.tp(raw)
952 buf.write(b"howdy!")
953 self.assertFalse(raw._write_stack)
954 buf.detach()
955 self.assertEqual(raw._write_stack, [b"howdy!"])
956
957 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000958 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000959 writer = self.MockRawIO()
960 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000961 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000962 self.assertFalse(writer._write_stack)
963
Antoine Pitrou19690592009-06-12 20:14:08 +0000964 def test_write_overflow(self):
965 writer = self.MockRawIO()
966 bufio = self.tp(writer, 8)
967 contents = b"abcdefghijklmnop"
968 for n in range(0, len(contents), 3):
969 bufio.write(contents[n:n+3])
970 flushed = b"".join(writer._write_stack)
971 # At least (total - 8) bytes were implicitly flushed, perhaps more
972 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000973 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000974
Antoine Pitrou19690592009-06-12 20:14:08 +0000975 def check_writes(self, intermediate_func):
976 # Lots of writes, test the flushed output is as expected.
977 contents = bytes(range(256)) * 1000
978 n = 0
979 writer = self.MockRawIO()
980 bufio = self.tp(writer, 13)
981 # Generator of write sizes: repeat each N 15 times then proceed to N+1
982 def gen_sizes():
983 for size in count(1):
984 for i in range(15):
985 yield size
986 sizes = gen_sizes()
987 while n < len(contents):
988 size = min(next(sizes), len(contents) - n)
Ezio Melotti2623a372010-11-21 13:34:58 +0000989 self.assertEqual(bufio.write(contents[n:n+size]), size)
Antoine Pitrou19690592009-06-12 20:14:08 +0000990 intermediate_func(bufio)
991 n += size
992 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +0000993 self.assertEqual(contents,
Antoine Pitrou19690592009-06-12 20:14:08 +0000994 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000995
Antoine Pitrou19690592009-06-12 20:14:08 +0000996 def test_writes(self):
997 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000998
Antoine Pitrou19690592009-06-12 20:14:08 +0000999 def test_writes_and_flushes(self):
1000 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001001
Antoine Pitrou19690592009-06-12 20:14:08 +00001002 def test_writes_and_seeks(self):
1003 def _seekabs(bufio):
1004 pos = bufio.tell()
1005 bufio.seek(pos + 1, 0)
1006 bufio.seek(pos - 1, 0)
1007 bufio.seek(pos, 0)
1008 self.check_writes(_seekabs)
1009 def _seekrel(bufio):
1010 pos = bufio.seek(0, 1)
1011 bufio.seek(+1, 1)
1012 bufio.seek(-1, 1)
1013 bufio.seek(pos, 0)
1014 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001015
Antoine Pitrou19690592009-06-12 20:14:08 +00001016 def test_writes_and_truncates(self):
1017 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001018
Antoine Pitrou19690592009-06-12 20:14:08 +00001019 def test_write_non_blocking(self):
1020 raw = self.MockNonBlockWriterIO()
1021 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001022
Ezio Melotti2623a372010-11-21 13:34:58 +00001023 self.assertEqual(bufio.write(b"abcd"), 4)
1024 self.assertEqual(bufio.write(b"efghi"), 5)
Antoine Pitrou19690592009-06-12 20:14:08 +00001025 # 1 byte will be written, the rest will be buffered
1026 raw.block_on(b"k")
Ezio Melotti2623a372010-11-21 13:34:58 +00001027 self.assertEqual(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001028
Antoine Pitrou19690592009-06-12 20:14:08 +00001029 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1030 raw.block_on(b"0")
1031 try:
1032 bufio.write(b"opqrwxyz0123456789")
1033 except self.BlockingIOError as e:
1034 written = e.characters_written
1035 else:
1036 self.fail("BlockingIOError should have been raised")
Ezio Melotti2623a372010-11-21 13:34:58 +00001037 self.assertEqual(written, 16)
1038 self.assertEqual(raw.pop_written(),
Antoine Pitrou19690592009-06-12 20:14:08 +00001039 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001040
Ezio Melotti2623a372010-11-21 13:34:58 +00001041 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Antoine Pitrou19690592009-06-12 20:14:08 +00001042 s = raw.pop_written()
1043 # Previously buffered bytes were flushed
1044 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001045
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 def test_write_and_rewind(self):
1047 raw = io.BytesIO()
1048 bufio = self.tp(raw, 4)
1049 self.assertEqual(bufio.write(b"abcdef"), 6)
1050 self.assertEqual(bufio.tell(), 6)
1051 bufio.seek(0, 0)
1052 self.assertEqual(bufio.write(b"XY"), 2)
1053 bufio.seek(6, 0)
1054 self.assertEqual(raw.getvalue(), b"XYcdef")
1055 self.assertEqual(bufio.write(b"123456"), 6)
1056 bufio.flush()
1057 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001058
Antoine Pitrou19690592009-06-12 20:14:08 +00001059 def test_flush(self):
1060 writer = self.MockRawIO()
1061 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001062 bufio.write(b"abc")
1063 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001064 self.assertEqual(b"abc", writer._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001065
Antoine Pitrou19690592009-06-12 20:14:08 +00001066 def test_destructor(self):
1067 writer = self.MockRawIO()
1068 bufio = self.tp(writer, 8)
1069 bufio.write(b"abc")
1070 del bufio
1071 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001072 self.assertEqual(b"abc", writer._write_stack[0])
Antoine Pitrou19690592009-06-12 20:14:08 +00001073
1074 def test_truncate(self):
1075 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001076 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 bufio = self.tp(raw, 8)
1078 bufio.write(b"abcdef")
1079 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001080 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001081 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001082 self.assertEqual(f.read(), b"abc")
1083
Victor Stinner6a102812010-04-27 23:55:59 +00001084 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitroud989f822010-10-14 15:43:25 +00001085 @support.requires_resource('cpu')
Antoine Pitrou19690592009-06-12 20:14:08 +00001086 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001087 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001088 # Write out many bytes from many threads and test they were
1089 # all flushed.
1090 N = 1000
1091 contents = bytes(range(256)) * N
1092 sizes = cycle([1, 19])
1093 n = 0
1094 queue = deque()
1095 while n < len(contents):
1096 size = next(sizes)
1097 queue.append(contents[n:n+size])
1098 n += size
1099 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001100 # We use a real file object because it allows us to
1101 # exercise situations where the GIL is released before
1102 # writing the buffer to the raw streams. This is in addition
1103 # to concurrency issues due to switching threads in the middle
1104 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001105 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001107 errors = []
1108 def f():
1109 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 while True:
1111 try:
1112 s = queue.popleft()
1113 except IndexError:
1114 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001115 bufio.write(s)
1116 except Exception as e:
1117 errors.append(e)
1118 raise
1119 threads = [threading.Thread(target=f) for x in range(20)]
1120 for t in threads:
1121 t.start()
1122 time.sleep(0.02) # yield
1123 for t in threads:
1124 t.join()
1125 self.assertFalse(errors,
1126 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001127 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001128 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001129 s = f.read()
1130 for i in range(256):
Ezio Melotti2623a372010-11-21 13:34:58 +00001131 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001132 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001133 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001134
Antoine Pitrou19690592009-06-12 20:14:08 +00001135 def test_misbehaved_io(self):
1136 rawio = self.MisbehavedRawIO()
1137 bufio = self.tp(rawio, 5)
1138 self.assertRaises(IOError, bufio.seek, 0)
1139 self.assertRaises(IOError, bufio.tell)
1140 self.assertRaises(IOError, bufio.write, b"abcdef")
1141
1142 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001143 with support.check_warnings(("max_buffer_size is deprecated",
1144 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001145 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001146
1147
1148class CBufferedWriterTest(BufferedWriterTest):
1149 tp = io.BufferedWriter
1150
1151 def test_constructor(self):
1152 BufferedWriterTest.test_constructor(self)
1153 # The allocation can succeed on 32-bit builds, e.g. with more
1154 # than 2GB RAM and a 64-bit kernel.
1155 if sys.maxsize > 0x7FFFFFFF:
1156 rawio = self.MockRawIO()
1157 bufio = self.tp(rawio)
1158 self.assertRaises((OverflowError, MemoryError, ValueError),
1159 bufio.__init__, rawio, sys.maxsize)
1160
1161 def test_initialization(self):
1162 rawio = self.MockRawIO()
1163 bufio = self.tp(rawio)
1164 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1165 self.assertRaises(ValueError, bufio.write, b"def")
1166 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1167 self.assertRaises(ValueError, bufio.write, b"def")
1168 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1169 self.assertRaises(ValueError, bufio.write, b"def")
1170
1171 def test_garbage_collection(self):
1172 # C BufferedWriter objects are collected, and collecting them flushes
1173 # all data to disk.
1174 # The Python version has __del__, so it ends into gc.garbage instead
1175 rawio = self.FileIO(support.TESTFN, "w+b")
1176 f = self.tp(rawio)
1177 f.write(b"123xxx")
1178 f.x = f
1179 wr = weakref.ref(f)
1180 del f
1181 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001182 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001183 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001184 self.assertEqual(f.read(), b"123xxx")
1185
1186
1187class PyBufferedWriterTest(BufferedWriterTest):
1188 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001189
1190class BufferedRWPairTest(unittest.TestCase):
1191
Antoine Pitrou19690592009-06-12 20:14:08 +00001192 def test_constructor(self):
1193 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001194 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001195
Antoine Pitrou19690592009-06-12 20:14:08 +00001196 def test_detach(self):
1197 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1198 self.assertRaises(self.UnsupportedOperation, pair.detach)
1199
1200 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001201 with support.check_warnings(("max_buffer_size is deprecated",
1202 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001203 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001204
1205 def test_constructor_with_not_readable(self):
1206 class NotReadable(MockRawIO):
1207 def readable(self):
1208 return False
1209
1210 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1211
1212 def test_constructor_with_not_writeable(self):
1213 class NotWriteable(MockRawIO):
1214 def writable(self):
1215 return False
1216
1217 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1218
1219 def test_read(self):
1220 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1221
1222 self.assertEqual(pair.read(3), b"abc")
1223 self.assertEqual(pair.read(1), b"d")
1224 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001225 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1226 self.assertEqual(pair.read(None), b"abc")
1227
1228 def test_readlines(self):
1229 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1230 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1231 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1232 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001233
1234 def test_read1(self):
1235 # .read1() is delegated to the underlying reader object, so this test
1236 # can be shallow.
1237 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1238
1239 self.assertEqual(pair.read1(3), b"abc")
1240
1241 def test_readinto(self):
1242 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1243
1244 data = bytearray(5)
1245 self.assertEqual(pair.readinto(data), 5)
1246 self.assertEqual(data, b"abcde")
1247
1248 def test_write(self):
1249 w = self.MockRawIO()
1250 pair = self.tp(self.MockRawIO(), w)
1251
1252 pair.write(b"abc")
1253 pair.flush()
1254 pair.write(b"def")
1255 pair.flush()
1256 self.assertEqual(w._write_stack, [b"abc", b"def"])
1257
1258 def test_peek(self):
1259 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1260
1261 self.assertTrue(pair.peek(3).startswith(b"abc"))
1262 self.assertEqual(pair.read(3), b"abc")
1263
1264 def test_readable(self):
1265 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1266 self.assertTrue(pair.readable())
1267
1268 def test_writeable(self):
1269 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1270 self.assertTrue(pair.writable())
1271
1272 def test_seekable(self):
1273 # BufferedRWPairs are never seekable, even if their readers and writers
1274 # are.
1275 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1276 self.assertFalse(pair.seekable())
1277
1278 # .flush() is delegated to the underlying writer object and has been
1279 # tested in the test_write method.
1280
1281 def test_close_and_closed(self):
1282 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1283 self.assertFalse(pair.closed)
1284 pair.close()
1285 self.assertTrue(pair.closed)
1286
1287 def test_isatty(self):
1288 class SelectableIsAtty(MockRawIO):
1289 def __init__(self, isatty):
1290 MockRawIO.__init__(self)
1291 self._isatty = isatty
1292
1293 def isatty(self):
1294 return self._isatty
1295
1296 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1297 self.assertFalse(pair.isatty())
1298
1299 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1300 self.assertTrue(pair.isatty())
1301
1302 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1303 self.assertTrue(pair.isatty())
1304
1305 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1306 self.assertTrue(pair.isatty())
1307
1308class CBufferedRWPairTest(BufferedRWPairTest):
1309 tp = io.BufferedRWPair
1310
1311class PyBufferedRWPairTest(BufferedRWPairTest):
1312 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001313
1314
Antoine Pitrou19690592009-06-12 20:14:08 +00001315class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1316 read_mode = "rb+"
1317 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001318
Antoine Pitrou19690592009-06-12 20:14:08 +00001319 def test_constructor(self):
1320 BufferedReaderTest.test_constructor(self)
1321 BufferedWriterTest.test_constructor(self)
1322
1323 def test_read_and_write(self):
1324 raw = self.MockRawIO((b"asdf", b"ghjk"))
1325 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001326
1327 self.assertEqual(b"as", rw.read(2))
1328 rw.write(b"ddd")
1329 rw.write(b"eee")
1330 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001331 self.assertEqual(b"ghjk", rw.read())
Ezio Melotti2623a372010-11-21 13:34:58 +00001332 self.assertEqual(b"dddeee", raw._write_stack[0])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001333
Antoine Pitrou19690592009-06-12 20:14:08 +00001334 def test_seek_and_tell(self):
1335 raw = self.BytesIO(b"asdfghjkl")
1336 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001337
Ezio Melotti2623a372010-11-21 13:34:58 +00001338 self.assertEqual(b"as", rw.read(2))
1339 self.assertEqual(2, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001340 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001341 self.assertEqual(b"asdf", rw.read(4))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001342
1343 rw.write(b"asdf")
1344 rw.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001345 self.assertEqual(b"asdfasdfl", rw.read())
1346 self.assertEqual(9, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001347 rw.seek(-4, 2)
Ezio Melotti2623a372010-11-21 13:34:58 +00001348 self.assertEqual(5, rw.tell())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001349 rw.seek(2, 1)
Ezio Melotti2623a372010-11-21 13:34:58 +00001350 self.assertEqual(7, rw.tell())
1351 self.assertEqual(b"fl", rw.read(11))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001352 self.assertRaises(TypeError, rw.seek, 0.0)
1353
Antoine Pitrou19690592009-06-12 20:14:08 +00001354 def check_flush_and_read(self, read_func):
1355 raw = self.BytesIO(b"abcdefghi")
1356 bufio = self.tp(raw)
1357
Ezio Melotti2623a372010-11-21 13:34:58 +00001358 self.assertEqual(b"ab", read_func(bufio, 2))
Antoine Pitrou19690592009-06-12 20:14:08 +00001359 bufio.write(b"12")
Ezio Melotti2623a372010-11-21 13:34:58 +00001360 self.assertEqual(b"ef", read_func(bufio, 2))
1361 self.assertEqual(6, bufio.tell())
Antoine Pitrou19690592009-06-12 20:14:08 +00001362 bufio.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001363 self.assertEqual(6, bufio.tell())
1364 self.assertEqual(b"ghi", read_func(bufio))
Antoine Pitrou19690592009-06-12 20:14:08 +00001365 raw.seek(0, 0)
1366 raw.write(b"XYZ")
1367 # flush() resets the read buffer
1368 bufio.flush()
1369 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001370 self.assertEqual(b"XYZ", read_func(bufio, 3))
Antoine Pitrou19690592009-06-12 20:14:08 +00001371
1372 def test_flush_and_read(self):
1373 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1374
1375 def test_flush_and_readinto(self):
1376 def _readinto(bufio, n=-1):
1377 b = bytearray(n if n >= 0 else 9999)
1378 n = bufio.readinto(b)
1379 return bytes(b[:n])
1380 self.check_flush_and_read(_readinto)
1381
1382 def test_flush_and_peek(self):
1383 def _peek(bufio, n=-1):
1384 # This relies on the fact that the buffer can contain the whole
1385 # raw stream, otherwise peek() can return less.
1386 b = bufio.peek(n)
1387 if n != -1:
1388 b = b[:n]
1389 bufio.seek(len(b), 1)
1390 return b
1391 self.check_flush_and_read(_peek)
1392
1393 def test_flush_and_write(self):
1394 raw = self.BytesIO(b"abcdefghi")
1395 bufio = self.tp(raw)
1396
1397 bufio.write(b"123")
1398 bufio.flush()
1399 bufio.write(b"45")
1400 bufio.flush()
1401 bufio.seek(0, 0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001402 self.assertEqual(b"12345fghi", raw.getvalue())
1403 self.assertEqual(b"12345fghi", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +00001404
1405 def test_threads(self):
1406 BufferedReaderTest.test_threads(self)
1407 BufferedWriterTest.test_threads(self)
1408
1409 def test_writes_and_peek(self):
1410 def _peek(bufio):
1411 bufio.peek(1)
1412 self.check_writes(_peek)
1413 def _peek(bufio):
1414 pos = bufio.tell()
1415 bufio.seek(-1, 1)
1416 bufio.peek(1)
1417 bufio.seek(pos, 0)
1418 self.check_writes(_peek)
1419
1420 def test_writes_and_reads(self):
1421 def _read(bufio):
1422 bufio.seek(-1, 1)
1423 bufio.read(1)
1424 self.check_writes(_read)
1425
1426 def test_writes_and_read1s(self):
1427 def _read1(bufio):
1428 bufio.seek(-1, 1)
1429 bufio.read1(1)
1430 self.check_writes(_read1)
1431
1432 def test_writes_and_readintos(self):
1433 def _read(bufio):
1434 bufio.seek(-1, 1)
1435 bufio.readinto(bytearray(1))
1436 self.check_writes(_read)
1437
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001438 def test_write_after_readahead(self):
1439 # Issue #6629: writing after the buffer was filled by readahead should
1440 # first rewind the raw stream.
1441 for overwrite_size in [1, 5]:
1442 raw = self.BytesIO(b"A" * 10)
1443 bufio = self.tp(raw, 4)
1444 # Trigger readahead
1445 self.assertEqual(bufio.read(1), b"A")
1446 self.assertEqual(bufio.tell(), 1)
1447 # Overwriting should rewind the raw stream if it needs so
1448 bufio.write(b"B" * overwrite_size)
1449 self.assertEqual(bufio.tell(), overwrite_size + 1)
1450 # If the write size was smaller than the buffer size, flush() and
1451 # check that rewind happens.
1452 bufio.flush()
1453 self.assertEqual(bufio.tell(), overwrite_size + 1)
1454 s = raw.getvalue()
1455 self.assertEqual(s,
1456 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1457
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001458 def test_truncate_after_read_or_write(self):
1459 raw = self.BytesIO(b"A" * 10)
1460 bufio = self.tp(raw, 100)
1461 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1462 self.assertEqual(bufio.truncate(), 2)
1463 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1464 self.assertEqual(bufio.truncate(), 4)
1465
Antoine Pitrou19690592009-06-12 20:14:08 +00001466 def test_misbehaved_io(self):
1467 BufferedReaderTest.test_misbehaved_io(self)
1468 BufferedWriterTest.test_misbehaved_io(self)
1469
1470class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1471 tp = io.BufferedRandom
1472
1473 def test_constructor(self):
1474 BufferedRandomTest.test_constructor(self)
1475 # The allocation can succeed on 32-bit builds, e.g. with more
1476 # than 2GB RAM and a 64-bit kernel.
1477 if sys.maxsize > 0x7FFFFFFF:
1478 rawio = self.MockRawIO()
1479 bufio = self.tp(rawio)
1480 self.assertRaises((OverflowError, MemoryError, ValueError),
1481 bufio.__init__, rawio, sys.maxsize)
1482
1483 def test_garbage_collection(self):
1484 CBufferedReaderTest.test_garbage_collection(self)
1485 CBufferedWriterTest.test_garbage_collection(self)
1486
1487class PyBufferedRandomTest(BufferedRandomTest):
1488 tp = pyio.BufferedRandom
1489
1490
Christian Heimes1a6387e2008-03-26 12:49:49 +00001491# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1492# properties:
1493# - A single output character can correspond to many bytes of input.
1494# - The number of input bytes to complete the character can be
1495# undetermined until the last input byte is received.
1496# - The number of input bytes can vary depending on previous input.
1497# - A single input byte can correspond to many characters of output.
1498# - The number of output characters can be undetermined until the
1499# last input byte is received.
1500# - The number of output characters can vary depending on previous input.
1501
1502class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1503 """
1504 For testing seek/tell behavior with a stateful, buffering decoder.
1505
1506 Input is a sequence of words. Words may be fixed-length (length set
1507 by input) or variable-length (period-terminated). In variable-length
1508 mode, extra periods are ignored. Possible words are:
1509 - 'i' followed by a number sets the input length, I (maximum 99).
1510 When I is set to 0, words are space-terminated.
1511 - 'o' followed by a number sets the output length, O (maximum 99).
1512 - Any other word is converted into a word followed by a period on
1513 the output. The output word consists of the input word truncated
1514 or padded out with hyphens to make its length equal to O. If O
1515 is 0, the word is output verbatim without truncating or padding.
1516 I and O are initially set to 1. When I changes, any buffered input is
1517 re-scanned according to the new I. EOF also terminates the last word.
1518 """
1519
1520 def __init__(self, errors='strict'):
1521 codecs.IncrementalDecoder.__init__(self, errors)
1522 self.reset()
1523
1524 def __repr__(self):
1525 return '<SID %x>' % id(self)
1526
1527 def reset(self):
1528 self.i = 1
1529 self.o = 1
1530 self.buffer = bytearray()
1531
1532 def getstate(self):
1533 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1534 return bytes(self.buffer), i*100 + o
1535
1536 def setstate(self, state):
1537 buffer, io = state
1538 self.buffer = bytearray(buffer)
1539 i, o = divmod(io, 100)
1540 self.i, self.o = i ^ 1, o ^ 1
1541
1542 def decode(self, input, final=False):
1543 output = ''
1544 for b in input:
1545 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001546 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001547 if self.buffer:
1548 output += self.process_word()
1549 else:
1550 self.buffer.append(b)
1551 else: # fixed-length, terminate after self.i bytes
1552 self.buffer.append(b)
1553 if len(self.buffer) == self.i:
1554 output += self.process_word()
1555 if final and self.buffer: # EOF terminates the last word
1556 output += self.process_word()
1557 return output
1558
1559 def process_word(self):
1560 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001561 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001562 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001563 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001564 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1565 else:
1566 output = self.buffer.decode('ascii')
1567 if len(output) < self.o:
1568 output += '-'*self.o # pad out with hyphens
1569 if self.o:
1570 output = output[:self.o] # truncate to output length
1571 output += '.'
1572 self.buffer = bytearray()
1573 return output
1574
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001575 codecEnabled = False
1576
1577 @classmethod
1578 def lookupTestDecoder(cls, name):
1579 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001580 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001581 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001582 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001583 incrementalencoder=None,
1584 streamreader=None, streamwriter=None,
1585 incrementaldecoder=cls)
1586
1587# Register the previous decoder for testing.
1588# Disabled by default, tests will enable it.
1589codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1590
1591
Christian Heimes1a6387e2008-03-26 12:49:49 +00001592class StatefulIncrementalDecoderTest(unittest.TestCase):
1593 """
1594 Make sure the StatefulIncrementalDecoder actually works.
1595 """
1596
1597 test_cases = [
1598 # I=1, O=1 (fixed-length input == fixed-length output)
1599 (b'abcd', False, 'a.b.c.d.'),
1600 # I=0, O=0 (variable-length input, variable-length output)
1601 (b'oiabcd', True, 'abcd.'),
1602 # I=0, O=0 (should ignore extra periods)
1603 (b'oi...abcd...', True, 'abcd.'),
1604 # I=0, O=6 (variable-length input, fixed-length output)
1605 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1606 # I=2, O=6 (fixed-length input < fixed-length output)
1607 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1608 # I=6, O=3 (fixed-length input > fixed-length output)
1609 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1610 # I=0, then 3; O=29, then 15 (with longer output)
1611 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1612 'a----------------------------.' +
1613 'b----------------------------.' +
1614 'cde--------------------------.' +
1615 'abcdefghijabcde.' +
1616 'a.b------------.' +
1617 '.c.------------.' +
1618 'd.e------------.' +
1619 'k--------------.' +
1620 'l--------------.' +
1621 'm--------------.')
1622 ]
1623
Antoine Pitrou19690592009-06-12 20:14:08 +00001624 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001625 # Try a few one-shot test cases.
1626 for input, eof, output in self.test_cases:
1627 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001628 self.assertEqual(d.decode(input, eof), output)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001629
1630 # Also test an unfinished decode, followed by forcing EOF.
1631 d = StatefulIncrementalDecoder()
Ezio Melotti2623a372010-11-21 13:34:58 +00001632 self.assertEqual(d.decode(b'oiabcd'), '')
1633 self.assertEqual(d.decode(b'', 1), 'abcd.')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001634
1635class TextIOWrapperTest(unittest.TestCase):
1636
1637 def setUp(self):
1638 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1639 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001640 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641
1642 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001643 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001644
Antoine Pitrou19690592009-06-12 20:14:08 +00001645 def test_constructor(self):
1646 r = self.BytesIO(b"\xc3\xa9\n\n")
1647 b = self.BufferedReader(r, 1000)
1648 t = self.TextIOWrapper(b)
1649 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melotti2623a372010-11-21 13:34:58 +00001650 self.assertEqual(t.encoding, "latin1")
1651 self.assertEqual(t.line_buffering, False)
Antoine Pitrou19690592009-06-12 20:14:08 +00001652 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melotti2623a372010-11-21 13:34:58 +00001653 self.assertEqual(t.encoding, "utf8")
1654 self.assertEqual(t.line_buffering, True)
1655 self.assertEqual("\xe9\n", t.readline())
Antoine Pitrou19690592009-06-12 20:14:08 +00001656 self.assertRaises(TypeError, t.__init__, b, newline=42)
1657 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1658
1659 def test_detach(self):
1660 r = self.BytesIO()
1661 b = self.BufferedWriter(r)
1662 t = self.TextIOWrapper(b)
1663 self.assertIs(t.detach(), b)
1664
1665 t = self.TextIOWrapper(b, encoding="ascii")
1666 t.write("howdy")
1667 self.assertFalse(r.getvalue())
1668 t.detach()
1669 self.assertEqual(r.getvalue(), b"howdy")
1670 self.assertRaises(ValueError, t.detach)
1671
1672 def test_repr(self):
1673 raw = self.BytesIO("hello".encode("utf-8"))
1674 b = self.BufferedReader(raw)
1675 t = self.TextIOWrapper(b, encoding="utf-8")
1676 modname = self.TextIOWrapper.__module__
1677 self.assertEqual(repr(t),
1678 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1679 raw.name = "dummy"
1680 self.assertEqual(repr(t),
1681 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1682 raw.name = b"dummy"
1683 self.assertEqual(repr(t),
1684 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1685
1686 def test_line_buffering(self):
1687 r = self.BytesIO()
1688 b = self.BufferedWriter(r, 1000)
1689 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1690 t.write("X")
Ezio Melotti2623a372010-11-21 13:34:58 +00001691 self.assertEqual(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001692 t.write("Y\nZ")
Ezio Melotti2623a372010-11-21 13:34:58 +00001693 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001694 t.write("A\rB")
Ezio Melotti2623a372010-11-21 13:34:58 +00001695 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001696
Antoine Pitrou19690592009-06-12 20:14:08 +00001697 def test_encoding(self):
1698 # Check the encoding attribute is always set, and valid
1699 b = self.BytesIO()
1700 t = self.TextIOWrapper(b, encoding="utf8")
1701 self.assertEqual(t.encoding, "utf8")
1702 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001703 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001704 codecs.lookup(t.encoding)
1705
1706 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001707 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001708 b = self.BytesIO(b"abc\n\xff\n")
1709 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001710 self.assertRaises(UnicodeError, t.read)
1711 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001712 b = self.BytesIO(b"abc\n\xff\n")
1713 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001714 self.assertRaises(UnicodeError, t.read)
1715 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001716 b = self.BytesIO(b"abc\n\xff\n")
1717 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melotti2623a372010-11-21 13:34:58 +00001718 self.assertEqual(t.read(), "abc\n\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001719 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001720 b = self.BytesIO(b"abc\n\xff\n")
1721 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melotti2623a372010-11-21 13:34:58 +00001722 self.assertEqual(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001723
Antoine Pitrou19690592009-06-12 20:14:08 +00001724 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001725 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001726 b = self.BytesIO()
1727 t = self.TextIOWrapper(b, encoding="ascii")
1728 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001729 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001730 b = self.BytesIO()
1731 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1732 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001733 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001734 b = self.BytesIO()
1735 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001736 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001737 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001738 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001739 self.assertEqual(b.getvalue(), b"abcdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001740 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001741 b = self.BytesIO()
1742 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001743 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001744 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001745 t.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001746 self.assertEqual(b.getvalue(), b"abc?def\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001747
Antoine Pitrou19690592009-06-12 20:14:08 +00001748 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001749 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1750
1751 tests = [
1752 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1753 [ '', input_lines ],
1754 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1755 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1756 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1757 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001758 encodings = (
1759 'utf-8', 'latin-1',
1760 'utf-16', 'utf-16-le', 'utf-16-be',
1761 'utf-32', 'utf-32-le', 'utf-32-be',
1762 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001763
1764 # Try a range of buffer sizes to test the case where \r is the last
1765 # character in TextIOWrapper._pending_line.
1766 for encoding in encodings:
1767 # XXX: str.encode() should return bytes
1768 data = bytes(''.join(input_lines).encode(encoding))
1769 for do_reads in (False, True):
1770 for bufsize in range(1, 10):
1771 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001772 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1773 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001774 encoding=encoding)
1775 if do_reads:
1776 got_lines = []
1777 while True:
1778 c2 = textio.read(2)
1779 if c2 == '':
1780 break
Ezio Melotti2623a372010-11-21 13:34:58 +00001781 self.assertEqual(len(c2), 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001782 got_lines.append(c2 + textio.readline())
1783 else:
1784 got_lines = list(textio)
1785
1786 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melotti2623a372010-11-21 13:34:58 +00001787 self.assertEqual(got_line, exp_line)
1788 self.assertEqual(len(got_lines), len(exp_lines))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001789
Antoine Pitrou19690592009-06-12 20:14:08 +00001790 def test_newlines_input(self):
1791 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001792 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1793 for newline, expected in [
1794 (None, normalized.decode("ascii").splitlines(True)),
1795 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001796 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1797 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1798 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001799 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001800 buf = self.BytesIO(testdata)
1801 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melotti2623a372010-11-21 13:34:58 +00001802 self.assertEqual(txt.readlines(), expected)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001803 txt.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001804 self.assertEqual(txt.read(), "".join(expected))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001805
Antoine Pitrou19690592009-06-12 20:14:08 +00001806 def test_newlines_output(self):
1807 testdict = {
1808 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1809 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1810 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1811 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1812 }
1813 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1814 for newline, expected in tests:
1815 buf = self.BytesIO()
1816 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1817 txt.write("AAA\nB")
1818 txt.write("BB\nCCC\n")
1819 txt.write("X\rY\r\nZ")
1820 txt.flush()
Ezio Melotti2623a372010-11-21 13:34:58 +00001821 self.assertEqual(buf.closed, False)
1822 self.assertEqual(buf.getvalue(), expected)
Antoine Pitrou19690592009-06-12 20:14:08 +00001823
1824 def test_destructor(self):
1825 l = []
1826 base = self.BytesIO
1827 class MyBytesIO(base):
1828 def close(self):
1829 l.append(self.getvalue())
1830 base.close(self)
1831 b = MyBytesIO()
1832 t = self.TextIOWrapper(b, encoding="ascii")
1833 t.write("abc")
1834 del t
1835 support.gc_collect()
Ezio Melotti2623a372010-11-21 13:34:58 +00001836 self.assertEqual([b"abc"], l)
Antoine Pitrou19690592009-06-12 20:14:08 +00001837
1838 def test_override_destructor(self):
1839 record = []
1840 class MyTextIO(self.TextIOWrapper):
1841 def __del__(self):
1842 record.append(1)
1843 try:
1844 f = super(MyTextIO, self).__del__
1845 except AttributeError:
1846 pass
1847 else:
1848 f()
1849 def close(self):
1850 record.append(2)
1851 super(MyTextIO, self).close()
1852 def flush(self):
1853 record.append(3)
1854 super(MyTextIO, self).flush()
1855 b = self.BytesIO()
1856 t = MyTextIO(b, encoding="ascii")
1857 del t
1858 support.gc_collect()
1859 self.assertEqual(record, [1, 2, 3])
1860
1861 def test_error_through_destructor(self):
1862 # Test that the exception state is not modified by a destructor,
1863 # even if close() fails.
1864 rawio = self.CloseFailureIO()
1865 def f():
1866 self.TextIOWrapper(rawio).xyzzy
1867 with support.captured_output("stderr") as s:
1868 self.assertRaises(AttributeError, f)
1869 s = s.getvalue().strip()
1870 if s:
1871 # The destructor *may* have printed an unraisable error, check it
1872 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001873 self.assertTrue(s.startswith("Exception IOError: "), s)
1874 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875
1876 # Systematic tests of the text I/O API
1877
Antoine Pitrou19690592009-06-12 20:14:08 +00001878 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001879 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1880 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001881 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001883 self.assertEqual(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001884 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001885 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 f._CHUNK_SIZE = chunksize
Ezio Melotti2623a372010-11-21 13:34:58 +00001887 self.assertEqual(f.tell(), 0)
1888 self.assertEqual(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00001890 self.assertEqual(f.seek(0), 0)
1891 self.assertEqual(f.read(None), "abc")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001892 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001893 self.assertEqual(f.read(2), "ab")
1894 self.assertEqual(f.read(1), "c")
1895 self.assertEqual(f.read(1), "")
1896 self.assertEqual(f.read(), "")
1897 self.assertEqual(f.tell(), cookie)
1898 self.assertEqual(f.seek(0), 0)
1899 self.assertEqual(f.seek(0, 2), cookie)
1900 self.assertEqual(f.write("def"), 3)
1901 self.assertEqual(f.seek(cookie), cookie)
1902 self.assertEqual(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001903 if enc.startswith("utf"):
1904 self.multi_line_test(f, enc)
1905 f.close()
1906
1907 def multi_line_test(self, f, enc):
1908 f.seek(0)
1909 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001910 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001911 wlines = []
1912 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1913 chars = []
1914 for i in range(size):
1915 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001916 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917 wlines.append((f.tell(), line))
1918 f.write(line)
1919 f.seek(0)
1920 rlines = []
1921 while True:
1922 pos = f.tell()
1923 line = f.readline()
1924 if not line:
1925 break
1926 rlines.append((pos, line))
Ezio Melotti2623a372010-11-21 13:34:58 +00001927 self.assertEqual(rlines, wlines)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001928
Antoine Pitrou19690592009-06-12 20:14:08 +00001929 def test_telling(self):
1930 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001932 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001933 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001934 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935 p2 = f.tell()
1936 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00001937 self.assertEqual(f.tell(), p0)
1938 self.assertEqual(f.readline(), "\xff\n")
1939 self.assertEqual(f.tell(), p1)
1940 self.assertEqual(f.readline(), "\xff\n")
1941 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942 f.seek(0)
1943 for line in f:
Ezio Melotti2623a372010-11-21 13:34:58 +00001944 self.assertEqual(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001945 self.assertRaises(IOError, f.tell)
Ezio Melotti2623a372010-11-21 13:34:58 +00001946 self.assertEqual(f.tell(), p2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001947 f.close()
1948
Antoine Pitrou19690592009-06-12 20:14:08 +00001949 def test_seeking(self):
1950 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001951 prefix_size = chunk_size - 2
1952 u_prefix = "a" * prefix_size
1953 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melotti2623a372010-11-21 13:34:58 +00001954 self.assertEqual(len(u_prefix), len(prefix))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001955 u_suffix = "\u8888\n"
1956 suffix = bytes(u_suffix.encode("utf-8"))
1957 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001958 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001959 f.write(line*2)
1960 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001961 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 s = f.read(prefix_size)
Ezio Melotti2623a372010-11-21 13:34:58 +00001963 self.assertEqual(s, prefix.decode("ascii"))
1964 self.assertEqual(f.tell(), prefix_size)
1965 self.assertEqual(f.readline(), u_suffix)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966
Antoine Pitrou19690592009-06-12 20:14:08 +00001967 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968 # Regression test for a specific bug
1969 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001970 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001971 f.write(data)
1972 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001973 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001974 f._CHUNK_SIZE # Just test that it exists
1975 f._CHUNK_SIZE = 2
1976 f.readline()
1977 f.tell()
1978
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 def test_seek_and_tell(self):
1980 #Test seek/tell using the StatefulIncrementalDecoder.
1981 # Make test faster by doing smaller seeks
1982 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001983
Antoine Pitrou19690592009-06-12 20:14:08 +00001984 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985 """Tell/seek to various points within a data stream and ensure
1986 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001987 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001988 f.write(data)
1989 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 f = self.open(support.TESTFN, encoding='test_decoder')
1991 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 decoded = f.read()
1993 f.close()
1994
1995 for i in range(min_pos, len(decoded) + 1): # seek positions
1996 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melotti2623a372010-11-21 13:34:58 +00001998 self.assertEqual(f.read(i), decoded[:i])
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999 cookie = f.tell()
Ezio Melotti2623a372010-11-21 13:34:58 +00002000 self.assertEqual(f.read(j), decoded[i:i + j])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002001 f.seek(cookie)
Ezio Melotti2623a372010-11-21 13:34:58 +00002002 self.assertEqual(f.read(), decoded[i:])
Christian Heimes1a6387e2008-03-26 12:49:49 +00002003 f.close()
2004
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002005 # Enable the test decoder.
2006 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00002007
2008 # Run the tests.
2009 try:
2010 # Try each test case.
2011 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00002012 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002013
2014 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00002015 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2016 offset = CHUNK_SIZE - len(input)//2
2017 prefix = b'.'*offset
2018 # Don't bother seeking into the prefix (takes too long).
2019 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021
2022 # Ensure our test decoder won't interfere with subsequent tests.
2023 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00002024 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00002025
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 def test_encoded_writes(self):
2027 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00002028 tests = ("utf-16",
2029 "utf-16-le",
2030 "utf-16-be",
2031 "utf-32",
2032 "utf-32-le",
2033 "utf-32-be")
2034 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00002035 buf = self.BytesIO()
2036 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002037 # Check if the BOM is written only once (see issue1753).
2038 f.write(data)
2039 f.write(data)
2040 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002041 self.assertEqual(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00002042 f.seek(0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002043 self.assertEqual(f.read(), data * 2)
2044 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045
Antoine Pitrou19690592009-06-12 20:14:08 +00002046 def test_unreadable(self):
2047 class UnReadable(self.BytesIO):
2048 def readable(self):
2049 return False
2050 txt = self.TextIOWrapper(UnReadable())
2051 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 def test_read_one_by_one(self):
2054 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002055 reads = ""
2056 while True:
2057 c = txt.read(1)
2058 if not c:
2059 break
2060 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002061 self.assertEqual(reads, "AA\nBB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002062
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002063 def test_readlines(self):
2064 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2065 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2066 txt.seek(0)
2067 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2068 txt.seek(0)
2069 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2070
Christian Heimes1a6387e2008-03-26 12:49:49 +00002071 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002072 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002073 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002074 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002075 reads = ""
2076 while True:
2077 c = txt.read(128)
2078 if not c:
2079 break
2080 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002081 self.assertEqual(reads, "A"*127+"\nB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002082
2083 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002084 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002085
2086 # read one char at a time
2087 reads = ""
2088 while True:
2089 c = txt.read(1)
2090 if not c:
2091 break
2092 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002093 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002094
2095 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002096 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002097 txt._CHUNK_SIZE = 4
2098
2099 reads = ""
2100 while True:
2101 c = txt.read(4)
2102 if not c:
2103 break
2104 reads += c
Ezio Melotti2623a372010-11-21 13:34:58 +00002105 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002106
2107 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002108 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002109 txt._CHUNK_SIZE = 4
2110
2111 reads = txt.read(4)
2112 reads += txt.read(4)
2113 reads += txt.readline()
2114 reads += txt.readline()
2115 reads += txt.readline()
Ezio Melotti2623a372010-11-21 13:34:58 +00002116 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002117
2118 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002119 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002120 txt._CHUNK_SIZE = 4
2121
2122 reads = txt.read(4)
2123 reads += txt.read()
Ezio Melotti2623a372010-11-21 13:34:58 +00002124 self.assertEqual(reads, self.normalized)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002125
2126 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002127 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002128 txt._CHUNK_SIZE = 4
2129
2130 reads = txt.read(4)
2131 pos = txt.tell()
2132 txt.seek(0)
2133 txt.seek(pos)
Ezio Melotti2623a372010-11-21 13:34:58 +00002134 self.assertEqual(txt.read(4), "BBB\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002135
2136 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002137 buffer = self.BytesIO(self.testdata)
2138 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002139
2140 self.assertEqual(buffer.seekable(), txt.seekable())
2141
Antoine Pitrou19690592009-06-12 20:14:08 +00002142 def test_append_bom(self):
2143 # The BOM is not written again when appending to a non-empty file
2144 filename = support.TESTFN
2145 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2146 with self.open(filename, 'w', encoding=charset) as f:
2147 f.write('aaa')
2148 pos = f.tell()
2149 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002150 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002151
2152 with self.open(filename, 'a', encoding=charset) as f:
2153 f.write('xxx')
2154 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002155 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002156
Antoine Pitrou19690592009-06-12 20:14:08 +00002157 def test_seek_bom(self):
2158 # Same test, but when seeking manually
2159 filename = support.TESTFN
2160 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2161 with self.open(filename, 'w', encoding=charset) as f:
2162 f.write('aaa')
2163 pos = f.tell()
2164 with self.open(filename, 'r+', encoding=charset) as f:
2165 f.seek(pos)
2166 f.write('zzz')
2167 f.seek(0)
2168 f.write('bbb')
2169 with self.open(filename, 'rb') as f:
Ezio Melotti2623a372010-11-21 13:34:58 +00002170 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitrou19690592009-06-12 20:14:08 +00002171
2172 def test_errors_property(self):
2173 with self.open(support.TESTFN, "w") as f:
2174 self.assertEqual(f.errors, "strict")
2175 with self.open(support.TESTFN, "w", errors="replace") as f:
2176 self.assertEqual(f.errors, "replace")
2177
Victor Stinner6a102812010-04-27 23:55:59 +00002178 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002179 def test_threads_write(self):
2180 # Issue6750: concurrent writes could duplicate data
2181 event = threading.Event()
2182 with self.open(support.TESTFN, "w", buffering=1) as f:
2183 def run(n):
2184 text = "Thread%03d\n" % n
2185 event.wait()
2186 f.write(text)
2187 threads = [threading.Thread(target=lambda n=x: run(n))
2188 for x in range(20)]
2189 for t in threads:
2190 t.start()
2191 time.sleep(0.02)
2192 event.set()
2193 for t in threads:
2194 t.join()
2195 with self.open(support.TESTFN) as f:
2196 content = f.read()
2197 for n in range(20):
Ezio Melotti2623a372010-11-21 13:34:58 +00002198 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002199
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002200 def test_flush_error_on_close(self):
2201 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2202 def bad_flush():
2203 raise IOError()
2204 txt.flush = bad_flush
2205 self.assertRaises(IOError, txt.close) # exception not swallowed
2206
2207 def test_multi_close(self):
2208 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2209 txt.close()
2210 txt.close()
2211 txt.close()
2212 self.assertRaises(ValueError, txt.flush)
2213
Antoine Pitrou19690592009-06-12 20:14:08 +00002214class CTextIOWrapperTest(TextIOWrapperTest):
2215
2216 def test_initialization(self):
2217 r = self.BytesIO(b"\xc3\xa9\n\n")
2218 b = self.BufferedReader(r, 1000)
2219 t = self.TextIOWrapper(b)
2220 self.assertRaises(TypeError, t.__init__, b, newline=42)
2221 self.assertRaises(ValueError, t.read)
2222 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2223 self.assertRaises(ValueError, t.read)
2224
2225 def test_garbage_collection(self):
2226 # C TextIOWrapper objects are collected, and collecting them flushes
2227 # all data to disk.
2228 # The Python version has __del__, so it ends in gc.garbage instead.
2229 rawio = io.FileIO(support.TESTFN, "wb")
2230 b = self.BufferedWriter(rawio)
2231 t = self.TextIOWrapper(b, encoding="ascii")
2232 t.write("456def")
2233 t.x = t
2234 wr = weakref.ref(t)
2235 del t
2236 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002237 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002238 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002239 self.assertEqual(f.read(), b"456def")
2240
2241class PyTextIOWrapperTest(TextIOWrapperTest):
2242 pass
2243
2244
2245class IncrementalNewlineDecoderTest(unittest.TestCase):
2246
2247 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002248 # UTF-8 specific tests for a newline decoder
2249 def _check_decode(b, s, **kwargs):
2250 # We exercise getstate() / setstate() as well as decode()
2251 state = decoder.getstate()
Ezio Melotti2623a372010-11-21 13:34:58 +00002252 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002253 decoder.setstate(state)
Ezio Melotti2623a372010-11-21 13:34:58 +00002254 self.assertEqual(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002255
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002256 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002257
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002258 _check_decode(b'\xe8', "")
2259 _check_decode(b'\xa2', "")
2260 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002261
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002262 _check_decode(b'\xe8', "")
2263 _check_decode(b'\xa2', "")
2264 _check_decode(b'\x88', "\u8888")
2265
2266 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002267 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2268
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002269 decoder.reset()
2270 _check_decode(b'\n', "\n")
2271 _check_decode(b'\r', "")
2272 _check_decode(b'', "\n", final=True)
2273 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002275 _check_decode(b'\r', "")
2276 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002277
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002278 _check_decode(b'\r\r\n', "\n\n")
2279 _check_decode(b'\r', "")
2280 _check_decode(b'\r', "\n")
2281 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002282
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002283 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2284 _check_decode(b'\xe8\xa2\x88', "\u8888")
2285 _check_decode(b'\n', "\n")
2286 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2287 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002288
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002290 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002291 if encoding is not None:
2292 encoder = codecs.getincrementalencoder(encoding)()
2293 def _decode_bytewise(s):
2294 # Decode one byte at a time
2295 for b in encoder.encode(s):
2296 result.append(decoder.decode(b))
2297 else:
2298 encoder = None
2299 def _decode_bytewise(s):
2300 # Decode one char at a time
2301 for c in s:
2302 result.append(decoder.decode(c))
Ezio Melotti2623a372010-11-21 13:34:58 +00002303 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002304 _decode_bytewise("abc\n\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002305 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002306 _decode_bytewise("\nabc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002307 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002308 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002309 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002310 _decode_bytewise("abc")
Ezio Melotti2623a372010-11-21 13:34:58 +00002311 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002312 _decode_bytewise("abc\r")
Ezio Melotti2623a372010-11-21 13:34:58 +00002313 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002314 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002315 input = "abc"
2316 if encoder is not None:
2317 encoder.reset()
2318 input = encoder.encode(input)
Ezio Melotti2623a372010-11-21 13:34:58 +00002319 self.assertEqual(decoder.decode(input), "abc")
2320 self.assertEqual(decoder.newlines, None)
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002321
2322 def test_newline_decoder(self):
2323 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 # None meaning the IncrementalNewlineDecoder takes unicode input
2325 # rather than bytes input
2326 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002327 'utf-16', 'utf-16-le', 'utf-16-be',
2328 'utf-32', 'utf-32-le', 'utf-32-be',
2329 )
2330 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002331 decoder = enc and codecs.getincrementaldecoder(enc)()
2332 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2333 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002334 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002335 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2336 self.check_newline_decoding_utf8(decoder)
2337
2338 def test_newline_bytes(self):
2339 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2340 def _check(dec):
Ezio Melotti2623a372010-11-21 13:34:58 +00002341 self.assertEqual(dec.newlines, None)
2342 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2343 self.assertEqual(dec.newlines, None)
2344 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2345 self.assertEqual(dec.newlines, None)
Antoine Pitrou19690592009-06-12 20:14:08 +00002346 dec = self.IncrementalNewlineDecoder(None, translate=False)
2347 _check(dec)
2348 dec = self.IncrementalNewlineDecoder(None, translate=True)
2349 _check(dec)
2350
2351class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2352 pass
2353
2354class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2355 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002356
Christian Heimes1a6387e2008-03-26 12:49:49 +00002357
2358# XXX Tests for open()
2359
2360class MiscIOTest(unittest.TestCase):
2361
Benjamin Petersonad100c32008-11-20 22:06:22 +00002362 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002363 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002364
Antoine Pitrou19690592009-06-12 20:14:08 +00002365 def test___all__(self):
2366 for name in self.io.__all__:
2367 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002368 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002369 if name == "open":
2370 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002371 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002372 self.assertTrue(issubclass(obj, Exception), name)
2373 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002374 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002375
Benjamin Petersonad100c32008-11-20 22:06:22 +00002376 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melotti2623a372010-11-21 13:34:58 +00002378 self.assertEqual(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002379 f.close()
2380
Antoine Pitrou19690592009-06-12 20:14:08 +00002381 f = self.open(support.TESTFN, "U")
Ezio Melotti2623a372010-11-21 13:34:58 +00002382 self.assertEqual(f.name, support.TESTFN)
2383 self.assertEqual(f.buffer.name, support.TESTFN)
2384 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2385 self.assertEqual(f.mode, "U")
2386 self.assertEqual(f.buffer.mode, "rb")
2387 self.assertEqual(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002388 f.close()
2389
Antoine Pitrou19690592009-06-12 20:14:08 +00002390 f = self.open(support.TESTFN, "w+")
Ezio Melotti2623a372010-11-21 13:34:58 +00002391 self.assertEqual(f.mode, "w+")
2392 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2393 self.assertEqual(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002394
Antoine Pitrou19690592009-06-12 20:14:08 +00002395 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melotti2623a372010-11-21 13:34:58 +00002396 self.assertEqual(g.mode, "wb")
2397 self.assertEqual(g.raw.mode, "wb")
2398 self.assertEqual(g.name, f.fileno())
2399 self.assertEqual(g.raw.name, f.fileno())
Benjamin Petersonad100c32008-11-20 22:06:22 +00002400 f.close()
2401 g.close()
2402
Antoine Pitrou19690592009-06-12 20:14:08 +00002403 def test_io_after_close(self):
2404 for kwargs in [
2405 {"mode": "w"},
2406 {"mode": "wb"},
2407 {"mode": "w", "buffering": 1},
2408 {"mode": "w", "buffering": 2},
2409 {"mode": "wb", "buffering": 0},
2410 {"mode": "r"},
2411 {"mode": "rb"},
2412 {"mode": "r", "buffering": 1},
2413 {"mode": "r", "buffering": 2},
2414 {"mode": "rb", "buffering": 0},
2415 {"mode": "w+"},
2416 {"mode": "w+b"},
2417 {"mode": "w+", "buffering": 1},
2418 {"mode": "w+", "buffering": 2},
2419 {"mode": "w+b", "buffering": 0},
2420 ]:
2421 f = self.open(support.TESTFN, **kwargs)
2422 f.close()
2423 self.assertRaises(ValueError, f.flush)
2424 self.assertRaises(ValueError, f.fileno)
2425 self.assertRaises(ValueError, f.isatty)
2426 self.assertRaises(ValueError, f.__iter__)
2427 if hasattr(f, "peek"):
2428 self.assertRaises(ValueError, f.peek, 1)
2429 self.assertRaises(ValueError, f.read)
2430 if hasattr(f, "read1"):
2431 self.assertRaises(ValueError, f.read1, 1024)
2432 if hasattr(f, "readinto"):
2433 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2434 self.assertRaises(ValueError, f.readline)
2435 self.assertRaises(ValueError, f.readlines)
2436 self.assertRaises(ValueError, f.seek, 0)
2437 self.assertRaises(ValueError, f.tell)
2438 self.assertRaises(ValueError, f.truncate)
2439 self.assertRaises(ValueError, f.write,
2440 b"" if "b" in kwargs['mode'] else "")
2441 self.assertRaises(ValueError, f.writelines, [])
2442 self.assertRaises(ValueError, next, f)
2443
2444 def test_blockingioerror(self):
2445 # Various BlockingIOError issues
2446 self.assertRaises(TypeError, self.BlockingIOError)
2447 self.assertRaises(TypeError, self.BlockingIOError, 1)
2448 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2449 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2450 b = self.BlockingIOError(1, "")
2451 self.assertEqual(b.characters_written, 0)
2452 class C(unicode):
2453 pass
2454 c = C("")
2455 b = self.BlockingIOError(1, c)
2456 c.b = b
2457 b.c = c
2458 wr = weakref.ref(c)
2459 del c, b
2460 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002461 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002462
2463 def test_abcs(self):
2464 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002465 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2466 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2467 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2468 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002469
2470 def _check_abc_inheritance(self, abcmodule):
2471 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002472 self.assertIsInstance(f, abcmodule.IOBase)
2473 self.assertIsInstance(f, abcmodule.RawIOBase)
2474 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2475 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002476 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002477 self.assertIsInstance(f, abcmodule.IOBase)
2478 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2479 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2480 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002481 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002482 self.assertIsInstance(f, abcmodule.IOBase)
2483 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2484 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2485 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002486
2487 def test_abc_inheritance(self):
2488 # Test implementations inherit from their respective ABCs
2489 self._check_abc_inheritance(self)
2490
2491 def test_abc_inheritance_official(self):
2492 # Test implementations inherit from the official ABCs of the
2493 # baseline "io" module.
2494 self._check_abc_inheritance(io)
2495
2496class CMiscIOTest(MiscIOTest):
2497 io = io
2498
2499class PyMiscIOTest(MiscIOTest):
2500 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002501
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002502
2503@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2504class SignalsTest(unittest.TestCase):
2505
2506 def setUp(self):
2507 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2508
2509 def tearDown(self):
2510 signal.signal(signal.SIGALRM, self.oldalrm)
2511
2512 def alarm_interrupt(self, sig, frame):
Florent Xicluna3fa3b002010-09-13 08:20:19 +00002513 1 // 0
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002514
2515 @unittest.skipUnless(threading, 'Threading required for this test.')
2516 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2517 """Check that a partial write, when it gets interrupted, properly
2518 invokes the signal handler."""
2519 read_results = []
2520 def _read():
2521 s = os.read(r, 1)
2522 read_results.append(s)
2523 t = threading.Thread(target=_read)
2524 t.daemon = True
2525 r, w = os.pipe()
2526 try:
2527 wio = self.io.open(w, **fdopen_kwargs)
2528 t.start()
2529 signal.alarm(1)
2530 # Fill the pipe enough that the write will be blocking.
2531 # It will be interrupted by the timer armed above. Since the
2532 # other thread has read one byte, the low-level write will
2533 # return with a successful (partial) result rather than an EINTR.
2534 # The buffered IO layer must check for pending signal
2535 # handlers, which in this case will invoke alarm_interrupt().
2536 self.assertRaises(ZeroDivisionError,
2537 wio.write, item * (1024 * 1024))
2538 t.join()
2539 # We got one byte, get another one and check that it isn't a
2540 # repeat of the first one.
2541 read_results.append(os.read(r, 1))
2542 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2543 finally:
2544 os.close(w)
2545 os.close(r)
2546 # This is deliberate. If we didn't close the file descriptor
2547 # before closing wio, wio would try to flush its internal
2548 # buffer, and block again.
2549 try:
2550 wio.close()
2551 except IOError as e:
2552 if e.errno != errno.EBADF:
2553 raise
2554
2555 def test_interrupted_write_unbuffered(self):
2556 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2557
2558 def test_interrupted_write_buffered(self):
2559 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2560
2561 def test_interrupted_write_text(self):
2562 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2563
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002564 def check_reentrant_write(self, data, **fdopen_kwargs):
2565 def on_alarm(*args):
2566 # Will be called reentrantly from the same thread
2567 wio.write(data)
2568 1/0
2569 signal.signal(signal.SIGALRM, on_alarm)
2570 r, w = os.pipe()
2571 wio = self.io.open(w, **fdopen_kwargs)
2572 try:
2573 signal.alarm(1)
2574 # Either the reentrant call to wio.write() fails with RuntimeError,
2575 # or the signal handler raises ZeroDivisionError.
2576 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2577 while 1:
2578 for i in range(100):
2579 wio.write(data)
2580 wio.flush()
2581 # Make sure the buffer doesn't fill up and block further writes
2582 os.read(r, len(data) * 100)
2583 exc = cm.exception
2584 if isinstance(exc, RuntimeError):
2585 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2586 finally:
2587 wio.close()
2588 os.close(r)
2589
2590 def test_reentrant_write_buffered(self):
2591 self.check_reentrant_write(b"xy", mode="wb")
2592
2593 def test_reentrant_write_text(self):
2594 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2595
2596
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002597class CSignalsTest(SignalsTest):
2598 io = io
2599
2600class PySignalsTest(SignalsTest):
2601 io = pyio
2602
Antoine Pitrou4cb64ad2010-12-03 19:31:52 +00002603 # Handling reentrancy issues would slow down _pyio even more, so the
2604 # tests are disabled.
2605 test_reentrant_write_buffered = None
2606 test_reentrant_write_text = None
2607
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002608
Christian Heimes1a6387e2008-03-26 12:49:49 +00002609def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002610 tests = (CIOTest, PyIOTest,
2611 CBufferedReaderTest, PyBufferedReaderTest,
2612 CBufferedWriterTest, PyBufferedWriterTest,
2613 CBufferedRWPairTest, PyBufferedRWPairTest,
2614 CBufferedRandomTest, PyBufferedRandomTest,
2615 StatefulIncrementalDecoderTest,
2616 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2617 CTextIOWrapperTest, PyTextIOWrapperTest,
2618 CMiscIOTest, PyMiscIOTest,
Antoine Pitrou3ebaed62010-08-21 19:17:25 +00002619 CSignalsTest, PySignalsTest,
Antoine Pitrou19690592009-06-12 20:14:08 +00002620 )
2621
2622 # Put the namespaces of the IO module we are testing and some useful mock
2623 # classes in the __dict__ of each test.
2624 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou6391b342010-09-14 18:48:19 +00002625 MockNonBlockWriterIO, MockRawIOWithoutRead)
Antoine Pitrou19690592009-06-12 20:14:08 +00002626 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2627 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2628 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2629 globs = globals()
2630 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2631 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2632 # Avoid turning open into a bound method.
2633 py_io_ns["open"] = pyio.OpenWrapper
2634 for test in tests:
2635 if test.__name__.startswith("C"):
2636 for name, obj in c_io_ns.items():
2637 setattr(test, name, obj)
2638 elif test.__name__.startswith("Py"):
2639 for name, obj in py_io_ns.items():
2640 setattr(test, name, obj)
2641
2642 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002643
2644if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002645 test_main()