blob: edb593b23d50623875477e444b38c1bd1f2ca981 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import warnings
33import weakref
34import gc
35import abc
36from itertools import chain, cycle, count
37from collections import deque
38from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000039
40import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
43
44__metaclass__ = type
45bytes = support.py3k_bytes
46
47def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io.open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000051
52
Antoine Pitrou19690592009-06-12 20:14:08 +000053class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000058 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000061 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000062 try:
63 return self._read_stack.pop(0)
64 except:
65 return b""
66
67 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000068 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000069 return len(b)
70
71 def writable(self):
72 return True
73
74 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
80 def seekable(self):
81 return True
82
83 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000084 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000085
86 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
112class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
114
115class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000117
118
Antoine Pitrou19690592009-06-12 20:14:08 +0000119class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return MockRawIO.write(self, b) * 2
122
123 def read(self, n=None):
124 return MockRawIO.read(self, n) * 2
125
126 def seek(self, pos, whence):
127 return -123
128
129 def tell(self):
130 return -456
131
132 def readinto(self, buf):
133 MockRawIO.readinto(self, buf)
134 return len(buf) * 5
135
136class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
138
139class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
141
142
143class CloseFailureIO(MockRawIO):
144 closed = 0
145
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
150
151class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
153
154class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
156
157
158class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000159
160 def __init__(self, data):
161 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000162 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163
164 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000165 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000166 self.read_history.append(None if res is None else len(res))
167 return res
168
Antoine Pitrou19690592009-06-12 20:14:08 +0000169 def readinto(self, b):
170 res = super(MockFileIO, self).readinto(b)
171 self.read_history.append(res)
172 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000173
Antoine Pitrou19690592009-06-12 20:14:08 +0000174class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
Antoine Pitrou19690592009-06-12 20:14:08 +0000177class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
179
180
181class MockNonBlockWriterIO:
182
183 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
Antoine Pitrou19690592009-06-12 20:14:08 +0000187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
191
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
195
196 def readable(self):
197 return True
198
199 def seekable(self):
200 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201
202 def writable(self):
203 return True
204
Antoine Pitrou19690592009-06-12 20:14:08 +0000205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
219
220class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
222
223class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
225
Christian Heimes1a6387e2008-03-26 12:49:49 +0000226
227class IOTest(unittest.TestCase):
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def setUp(self):
230 support.unlink(support.TESTFN)
231
Christian Heimes1a6387e2008-03-26 12:49:49 +0000232 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000233 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000234
235 def write_ops(self, f):
236 self.assertEqual(f.write(b"blah."), 5)
237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
239 self.assertEqual(f.tell(), 6)
240 self.assertEqual(f.seek(-1, 1), 5)
241 self.assertEqual(f.tell(), 5)
242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"h"), 1)
245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
247 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000248 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 self.assertRaises(TypeError, f.seek, 0.0)
250
251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
254 data = bytearray(data)
255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
267 self.assertEqual(f.readinto(bytearray()), 0)
268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
271 self.assertRaises(TypeError, f.seek, 0.0)
272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
278
279 LARGE = 2**31
280
281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
286 self.assertEqual(f.write(b"xxx"), 3)
287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
289 self.assertEqual(f.truncate(), self.LARGE + 2)
290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000293 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
296 self.assertEqual(f.read(2), b"x")
297
Antoine Pitrou19690592009-06-12 20:14:08 +0000298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000301 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000304 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000307 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
310
Christian Heimes1a6387e2008-03-26 12:49:49 +0000311 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000322
323 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334
335 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
344 self.assertEqual(f.readline(), b"another line")
345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000348
349 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000350 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000354 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355 self.read_ops(f, True)
356
357 def test_large_file_ops(self):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
362 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000379 f.write(b"xxx")
380 self.assertEqual(f.closed, True)
381 f = None
382 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384 1/0
385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
388 self.fail("1/0 didn't raise an exception")
389
Antoine Pitroue741cc62009-01-21 00:45:36 +0000390 # issue 5008
391 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000392 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000393 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000394 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000395 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000397 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000398 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000399 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400
Christian Heimes1a6387e2008-03-26 12:49:49 +0000401 def test_destructor(self):
402 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404 def __del__(self):
405 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000406 try:
407 f = super(MyFileIO, self).__del__
408 except AttributeError:
409 pass
410 else:
411 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412 def close(self):
413 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000414 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 def flush(self):
416 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 super(MyFileIO, self).flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000420 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 support.gc_collect()
422 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000423 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 self.assertEqual(f.read(), b"xxx")
425
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super(MyIO, self).__del__
441 except AttributeError:
442 pass
443 else:
444 f()
445 def close(self):
446 record.append(self.on_close)
447 super(MyIO, self).close()
448 def flush(self):
449 record.append(self.on_flush)
450 super(MyIO, self).flush()
451 f = MyIO()
452 del f
453 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000454 self.assertEqual(record, [1, 2, 3])
455
Antoine Pitrou19690592009-06-12 20:14:08 +0000456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000458
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
461
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
464
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
467
468 def test_close_flushes(self):
469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
473
474 def test_array_writes(self):
475 a = array.array(b'i', range(10))
476 n = len(a.tostring())
477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
482 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 closefd=False)
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000488 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
495
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000499
500 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000501 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000502 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000503 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000504 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000505 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000506 self.assertEqual(file.buffer.raw.closefd, False)
507
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
516 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000517 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000518 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
525 self.skipTest("{0} does not exist".format(zero))
526 if sys.maxsize > 0x7FFFFFFF:
527 self.skipTest("test can only run in a 32-bit address space")
528 if support.real_max_memuse < support._2G:
529 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000530 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000532 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000534 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000536
Antoine Pitrou19690592009-06-12 20:14:08 +0000537class CIOTest(IOTest):
538 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000539
Antoine Pitrou19690592009-06-12 20:14:08 +0000540class PyIOTest(IOTest):
541 test_array_writes = unittest.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000544
545
Antoine Pitrou19690592009-06-12 20:14:08 +0000546class CommonBufferedTests:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
548
549 def test_detach(self):
550 raw = self.MockRawIO()
551 buf = self.tp(raw)
552 self.assertIs(buf.detach(), raw)
553 self.assertRaises(ValueError, buf.detach)
554
555 def test_fileno(self):
556 rawio = self.MockRawIO()
557 bufio = self.tp(rawio)
558
559 self.assertEquals(42, bufio.fileno())
560
561 def test_no_fileno(self):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
564 pass
565
566 def test_invalid_args(self):
567 rawio = self.MockRawIO()
568 bufio = self.tp(rawio)
569 # Invalid whence
570 self.assertRaises(ValueError, bufio.seek, 0, -1)
571 self.assertRaises(ValueError, bufio.seek, 0, 3)
572
573 def test_override_destructor(self):
574 tp = self.tp
575 record = []
576 class MyBufferedIO(tp):
577 def __del__(self):
578 record.append(1)
579 try:
580 f = super(MyBufferedIO, self).__del__
581 except AttributeError:
582 pass
583 else:
584 f()
585 def close(self):
586 record.append(2)
587 super(MyBufferedIO, self).close()
588 def flush(self):
589 record.append(3)
590 super(MyBufferedIO, self).flush()
591 rawio = self.MockRawIO()
592 bufio = MyBufferedIO(rawio)
593 writable = bufio.writable()
594 del bufio
595 support.gc_collect()
596 if writable:
597 self.assertEqual(record, [1, 2, 3])
598 else:
599 self.assertEqual(record, [1, 2])
600
601 def test_context_manager(self):
602 # Test usability as a context manager
603 rawio = self.MockRawIO()
604 bufio = self.tp(rawio)
605 def _with():
606 with bufio:
607 pass
608 _with()
609 # bufio should now be closed, and using it a second time should raise
610 # a ValueError.
611 self.assertRaises(ValueError, _with)
612
613 def test_error_through_destructor(self):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio = self.CloseFailureIO()
617 def f():
618 self.tp(rawio).xyzzy
619 with support.captured_output("stderr") as s:
620 self.assertRaises(AttributeError, f)
621 s = s.getvalue().strip()
622 if s:
623 # The destructor *may* have printed an unraisable error, check it
624 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000625 self.assertTrue(s.startswith("Exception IOError: "), s)
626 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000627
628 def test_repr(self):
629 raw = self.MockRawIO()
630 b = self.tp(raw)
631 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
632 self.assertEqual(repr(b), "<%s>" % clsname)
633 raw.name = "dummy"
634 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
635 raw.name = b"dummy"
636 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000637
638
Antoine Pitrou19690592009-06-12 20:14:08 +0000639class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
640 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
Antoine Pitrou19690592009-06-12 20:14:08 +0000642 def test_constructor(self):
643 rawio = self.MockRawIO([b"abc"])
644 bufio = self.tp(rawio)
645 bufio.__init__(rawio)
646 bufio.__init__(rawio, buffer_size=1024)
647 bufio.__init__(rawio, buffer_size=16)
648 self.assertEquals(b"abc", bufio.read())
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
651 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
652 rawio = self.MockRawIO([b"abc"])
653 bufio.__init__(rawio)
654 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000655
Antoine Pitrou19690592009-06-12 20:14:08 +0000656 def test_read(self):
657 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
658 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000659 self.assertEquals(b"abcdef", bufio.read(6))
Antoine Pitrou19690592009-06-12 20:14:08 +0000660 # Invalid args
661 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000662
Antoine Pitrou19690592009-06-12 20:14:08 +0000663 def test_read1(self):
664 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
665 bufio = self.tp(rawio)
666 self.assertEquals(b"a", bufio.read(1))
667 self.assertEquals(b"b", bufio.read1(1))
668 self.assertEquals(rawio._reads, 1)
669 self.assertEquals(b"c", bufio.read1(100))
670 self.assertEquals(rawio._reads, 1)
671 self.assertEquals(b"d", bufio.read1(100))
672 self.assertEquals(rawio._reads, 2)
673 self.assertEquals(b"efg", bufio.read1(100))
674 self.assertEquals(rawio._reads, 3)
675 self.assertEquals(b"", bufio.read1(100))
676 # Invalid args
677 self.assertRaises(ValueError, bufio.read1, -1)
678
679 def test_readinto(self):
680 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
681 bufio = self.tp(rawio)
682 b = bytearray(2)
683 self.assertEquals(bufio.readinto(b), 2)
684 self.assertEquals(b, b"ab")
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"cd")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"ef")
689 self.assertEquals(bufio.readinto(b), 1)
690 self.assertEquals(b, b"gf")
691 self.assertEquals(bufio.readinto(b), 0)
692 self.assertEquals(b, b"gf")
693
694 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000695 data = b"abcdefghi"
696 dlen = len(data)
697
698 tests = [
699 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
700 [ 100, [ 3, 3, 3], [ dlen ] ],
701 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
702 ]
703
704 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000705 rawio = self.MockFileIO(data)
706 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000707 pos = 0
708 for nbytes in buf_read_sizes:
709 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
710 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000711 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000712 self.assertEquals(rawio.read_history, raw_read_sizes)
713
Antoine Pitrou19690592009-06-12 20:14:08 +0000714 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000715 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000716 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
717 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000718
719 self.assertEquals(b"abcd", bufio.read(6))
720 self.assertEquals(b"e", bufio.read(1))
721 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000723 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000724 self.assertEquals(b"", bufio.read())
725
Antoine Pitrou19690592009-06-12 20:14:08 +0000726 def test_read_past_eof(self):
727 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
728 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729
730 self.assertEquals(b"abcdefg", bufio.read(9000))
731
Antoine Pitrou19690592009-06-12 20:14:08 +0000732 def test_read_all(self):
733 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
734 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000735
736 self.assertEquals(b"abcdefg", bufio.read())
737
Antoine Pitrou19690592009-06-12 20:14:08 +0000738 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000739 try:
740 # Write out many bytes with exactly the same number of 0's,
741 # 1's... 255's. This will help us check that concurrent reading
742 # doesn't duplicate or forget contents.
743 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000744 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000745 random.shuffle(l)
746 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000747 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000748 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000749 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000750 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000751 errors = []
752 results = []
753 def f():
754 try:
755 # Intra-buffer read then buffer-flushing read
756 for n in cycle([1, 19]):
757 s = bufio.read(n)
758 if not s:
759 break
760 # list.append() is atomic
761 results.append(s)
762 except Exception as e:
763 errors.append(e)
764 raise
765 threads = [threading.Thread(target=f) for x in range(20)]
766 for t in threads:
767 t.start()
768 time.sleep(0.02) # yield
769 for t in threads:
770 t.join()
771 self.assertFalse(errors,
772 "the following exceptions were caught: %r" % errors)
773 s = b''.join(results)
774 for i in range(256):
775 c = bytes(bytearray([i]))
776 self.assertEqual(s.count(c), N)
777 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000778 support.unlink(support.TESTFN)
779
780 def test_misbehaved_io(self):
781 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
782 bufio = self.tp(rawio)
783 self.assertRaises(IOError, bufio.seek, 0)
784 self.assertRaises(IOError, bufio.tell)
785
786class CBufferedReaderTest(BufferedReaderTest):
787 tp = io.BufferedReader
788
789 def test_constructor(self):
790 BufferedReaderTest.test_constructor(self)
791 # The allocation can succeed on 32-bit builds, e.g. with more
792 # than 2GB RAM and a 64-bit kernel.
793 if sys.maxsize > 0x7FFFFFFF:
794 rawio = self.MockRawIO()
795 bufio = self.tp(rawio)
796 self.assertRaises((OverflowError, MemoryError, ValueError),
797 bufio.__init__, rawio, sys.maxsize)
798
799 def test_initialization(self):
800 rawio = self.MockRawIO([b"abc"])
801 bufio = self.tp(rawio)
802 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
803 self.assertRaises(ValueError, bufio.read)
804 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
805 self.assertRaises(ValueError, bufio.read)
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
807 self.assertRaises(ValueError, bufio.read)
808
809 def test_misbehaved_io_read(self):
810 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
811 bufio = self.tp(rawio)
812 # _pyio.BufferedReader seems to implement reading different, so that
813 # checking this is not so easy.
814 self.assertRaises(IOError, bufio.read, 10)
815
816 def test_garbage_collection(self):
817 # C BufferedReader objects are collected.
818 # The Python version has __del__, so it ends into gc.garbage instead
819 rawio = self.FileIO(support.TESTFN, "w+b")
820 f = self.tp(rawio)
821 f.f = f
822 wr = weakref.ref(f)
823 del f
824 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000825 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000826
827class PyBufferedReaderTest(BufferedReaderTest):
828 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000829
830
Antoine Pitrou19690592009-06-12 20:14:08 +0000831class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
832 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000833
Antoine Pitrou19690592009-06-12 20:14:08 +0000834 def test_constructor(self):
835 rawio = self.MockRawIO()
836 bufio = self.tp(rawio)
837 bufio.__init__(rawio)
838 bufio.__init__(rawio, buffer_size=1024)
839 bufio.__init__(rawio, buffer_size=16)
840 self.assertEquals(3, bufio.write(b"abc"))
841 bufio.flush()
842 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
845 bufio.__init__(rawio)
846 self.assertEquals(3, bufio.write(b"ghi"))
847 bufio.flush()
848 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849
Antoine Pitrou19690592009-06-12 20:14:08 +0000850 def test_detach_flush(self):
851 raw = self.MockRawIO()
852 buf = self.tp(raw)
853 buf.write(b"howdy!")
854 self.assertFalse(raw._write_stack)
855 buf.detach()
856 self.assertEqual(raw._write_stack, [b"howdy!"])
857
858 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000859 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000860 writer = self.MockRawIO()
861 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000862 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000863 self.assertFalse(writer._write_stack)
864
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_write_overflow(self):
866 writer = self.MockRawIO()
867 bufio = self.tp(writer, 8)
868 contents = b"abcdefghijklmnop"
869 for n in range(0, len(contents), 3):
870 bufio.write(contents[n:n+3])
871 flushed = b"".join(writer._write_stack)
872 # At least (total - 8) bytes were implicitly flushed, perhaps more
873 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000874 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000875
Antoine Pitrou19690592009-06-12 20:14:08 +0000876 def check_writes(self, intermediate_func):
877 # Lots of writes, test the flushed output is as expected.
878 contents = bytes(range(256)) * 1000
879 n = 0
880 writer = self.MockRawIO()
881 bufio = self.tp(writer, 13)
882 # Generator of write sizes: repeat each N 15 times then proceed to N+1
883 def gen_sizes():
884 for size in count(1):
885 for i in range(15):
886 yield size
887 sizes = gen_sizes()
888 while n < len(contents):
889 size = min(next(sizes), len(contents) - n)
890 self.assertEquals(bufio.write(contents[n:n+size]), size)
891 intermediate_func(bufio)
892 n += size
893 bufio.flush()
894 self.assertEquals(contents,
895 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000896
Antoine Pitrou19690592009-06-12 20:14:08 +0000897 def test_writes(self):
898 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000899
Antoine Pitrou19690592009-06-12 20:14:08 +0000900 def test_writes_and_flushes(self):
901 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000902
Antoine Pitrou19690592009-06-12 20:14:08 +0000903 def test_writes_and_seeks(self):
904 def _seekabs(bufio):
905 pos = bufio.tell()
906 bufio.seek(pos + 1, 0)
907 bufio.seek(pos - 1, 0)
908 bufio.seek(pos, 0)
909 self.check_writes(_seekabs)
910 def _seekrel(bufio):
911 pos = bufio.seek(0, 1)
912 bufio.seek(+1, 1)
913 bufio.seek(-1, 1)
914 bufio.seek(pos, 0)
915 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000916
Antoine Pitrou19690592009-06-12 20:14:08 +0000917 def test_writes_and_truncates(self):
918 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000919
Antoine Pitrou19690592009-06-12 20:14:08 +0000920 def test_write_non_blocking(self):
921 raw = self.MockNonBlockWriterIO()
922 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000923
Antoine Pitrou19690592009-06-12 20:14:08 +0000924 self.assertEquals(bufio.write(b"abcd"), 4)
925 self.assertEquals(bufio.write(b"efghi"), 5)
926 # 1 byte will be written, the rest will be buffered
927 raw.block_on(b"k")
928 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000929
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 # 8 bytes will be written, 8 will be buffered and the rest will be lost
931 raw.block_on(b"0")
932 try:
933 bufio.write(b"opqrwxyz0123456789")
934 except self.BlockingIOError as e:
935 written = e.characters_written
936 else:
937 self.fail("BlockingIOError should have been raised")
938 self.assertEquals(written, 16)
939 self.assertEquals(raw.pop_written(),
940 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Antoine Pitrou19690592009-06-12 20:14:08 +0000942 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
943 s = raw.pop_written()
944 # Previously buffered bytes were flushed
945 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000946
Antoine Pitrou19690592009-06-12 20:14:08 +0000947 def test_write_and_rewind(self):
948 raw = io.BytesIO()
949 bufio = self.tp(raw, 4)
950 self.assertEqual(bufio.write(b"abcdef"), 6)
951 self.assertEqual(bufio.tell(), 6)
952 bufio.seek(0, 0)
953 self.assertEqual(bufio.write(b"XY"), 2)
954 bufio.seek(6, 0)
955 self.assertEqual(raw.getvalue(), b"XYcdef")
956 self.assertEqual(bufio.write(b"123456"), 6)
957 bufio.flush()
958 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 def test_flush(self):
961 writer = self.MockRawIO()
962 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000963 bufio.write(b"abc")
964 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000965 self.assertEquals(b"abc", writer._write_stack[0])
966
Antoine Pitrou19690592009-06-12 20:14:08 +0000967 def test_destructor(self):
968 writer = self.MockRawIO()
969 bufio = self.tp(writer, 8)
970 bufio.write(b"abc")
971 del bufio
972 support.gc_collect()
973 self.assertEquals(b"abc", writer._write_stack[0])
974
975 def test_truncate(self):
976 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000977 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000978 bufio = self.tp(raw, 8)
979 bufio.write(b"abcdef")
980 self.assertEqual(bufio.truncate(3), 3)
981 self.assertEqual(bufio.tell(), 3)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000982 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000983 self.assertEqual(f.read(), b"abc")
984
985 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000986 try:
Antoine Pitrou19690592009-06-12 20:14:08 +0000987 # Write out many bytes from many threads and test they were
988 # all flushed.
989 N = 1000
990 contents = bytes(range(256)) * N
991 sizes = cycle([1, 19])
992 n = 0
993 queue = deque()
994 while n < len(contents):
995 size = next(sizes)
996 queue.append(contents[n:n+size])
997 n += size
998 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000999 # We use a real file object because it allows us to
1000 # exercise situations where the GIL is released before
1001 # writing the buffer to the raw streams. This is in addition
1002 # to concurrency issues due to switching threads in the middle
1003 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001004 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001005 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001006 errors = []
1007 def f():
1008 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001009 while True:
1010 try:
1011 s = queue.popleft()
1012 except IndexError:
1013 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001014 bufio.write(s)
1015 except Exception as e:
1016 errors.append(e)
1017 raise
1018 threads = [threading.Thread(target=f) for x in range(20)]
1019 for t in threads:
1020 t.start()
1021 time.sleep(0.02) # yield
1022 for t in threads:
1023 t.join()
1024 self.assertFalse(errors,
1025 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001027 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001028 s = f.read()
1029 for i in range(256):
1030 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001031 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001032 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001033
Antoine Pitrou19690592009-06-12 20:14:08 +00001034 def test_misbehaved_io(self):
1035 rawio = self.MisbehavedRawIO()
1036 bufio = self.tp(rawio, 5)
1037 self.assertRaises(IOError, bufio.seek, 0)
1038 self.assertRaises(IOError, bufio.tell)
1039 self.assertRaises(IOError, bufio.write, b"abcdef")
1040
1041 def test_max_buffer_size_deprecation(self):
1042 with support.check_warnings() as w:
1043 warnings.simplefilter("always", DeprecationWarning)
1044 self.tp(self.MockRawIO(), 8, 12)
1045 self.assertEqual(len(w.warnings), 1)
1046 warning = w.warnings[0]
1047 self.assertTrue(warning.category is DeprecationWarning)
1048 self.assertEqual(str(warning.message),
1049 "max_buffer_size is deprecated")
1050
1051
1052class CBufferedWriterTest(BufferedWriterTest):
1053 tp = io.BufferedWriter
1054
1055 def test_constructor(self):
1056 BufferedWriterTest.test_constructor(self)
1057 # The allocation can succeed on 32-bit builds, e.g. with more
1058 # than 2GB RAM and a 64-bit kernel.
1059 if sys.maxsize > 0x7FFFFFFF:
1060 rawio = self.MockRawIO()
1061 bufio = self.tp(rawio)
1062 self.assertRaises((OverflowError, MemoryError, ValueError),
1063 bufio.__init__, rawio, sys.maxsize)
1064
1065 def test_initialization(self):
1066 rawio = self.MockRawIO()
1067 bufio = self.tp(rawio)
1068 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1069 self.assertRaises(ValueError, bufio.write, b"def")
1070 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1071 self.assertRaises(ValueError, bufio.write, b"def")
1072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1073 self.assertRaises(ValueError, bufio.write, b"def")
1074
1075 def test_garbage_collection(self):
1076 # C BufferedWriter objects are collected, and collecting them flushes
1077 # all data to disk.
1078 # The Python version has __del__, so it ends into gc.garbage instead
1079 rawio = self.FileIO(support.TESTFN, "w+b")
1080 f = self.tp(rawio)
1081 f.write(b"123xxx")
1082 f.x = f
1083 wr = weakref.ref(f)
1084 del f
1085 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001086 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001087 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001088 self.assertEqual(f.read(), b"123xxx")
1089
1090
1091class PyBufferedWriterTest(BufferedWriterTest):
1092 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001093
1094class BufferedRWPairTest(unittest.TestCase):
1095
Antoine Pitrou19690592009-06-12 20:14:08 +00001096 def test_constructor(self):
1097 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001098 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001099
Antoine Pitrou19690592009-06-12 20:14:08 +00001100 def test_detach(self):
1101 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1102 self.assertRaises(self.UnsupportedOperation, pair.detach)
1103
1104 def test_constructor_max_buffer_size_deprecation(self):
1105 with support.check_warnings() as w:
1106 warnings.simplefilter("always", DeprecationWarning)
1107 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1108 self.assertEqual(len(w.warnings), 1)
1109 warning = w.warnings[0]
1110 self.assertTrue(warning.category is DeprecationWarning)
1111 self.assertEqual(str(warning.message),
1112 "max_buffer_size is deprecated")
1113
1114 def test_constructor_with_not_readable(self):
1115 class NotReadable(MockRawIO):
1116 def readable(self):
1117 return False
1118
1119 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1120
1121 def test_constructor_with_not_writeable(self):
1122 class NotWriteable(MockRawIO):
1123 def writable(self):
1124 return False
1125
1126 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1127
1128 def test_read(self):
1129 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1130
1131 self.assertEqual(pair.read(3), b"abc")
1132 self.assertEqual(pair.read(1), b"d")
1133 self.assertEqual(pair.read(), b"ef")
1134
1135 def test_read1(self):
1136 # .read1() is delegated to the underlying reader object, so this test
1137 # can be shallow.
1138 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1139
1140 self.assertEqual(pair.read1(3), b"abc")
1141
1142 def test_readinto(self):
1143 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1144
1145 data = bytearray(5)
1146 self.assertEqual(pair.readinto(data), 5)
1147 self.assertEqual(data, b"abcde")
1148
1149 def test_write(self):
1150 w = self.MockRawIO()
1151 pair = self.tp(self.MockRawIO(), w)
1152
1153 pair.write(b"abc")
1154 pair.flush()
1155 pair.write(b"def")
1156 pair.flush()
1157 self.assertEqual(w._write_stack, [b"abc", b"def"])
1158
1159 def test_peek(self):
1160 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1161
1162 self.assertTrue(pair.peek(3).startswith(b"abc"))
1163 self.assertEqual(pair.read(3), b"abc")
1164
1165 def test_readable(self):
1166 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1167 self.assertTrue(pair.readable())
1168
1169 def test_writeable(self):
1170 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1171 self.assertTrue(pair.writable())
1172
1173 def test_seekable(self):
1174 # BufferedRWPairs are never seekable, even if their readers and writers
1175 # are.
1176 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1177 self.assertFalse(pair.seekable())
1178
1179 # .flush() is delegated to the underlying writer object and has been
1180 # tested in the test_write method.
1181
1182 def test_close_and_closed(self):
1183 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1184 self.assertFalse(pair.closed)
1185 pair.close()
1186 self.assertTrue(pair.closed)
1187
1188 def test_isatty(self):
1189 class SelectableIsAtty(MockRawIO):
1190 def __init__(self, isatty):
1191 MockRawIO.__init__(self)
1192 self._isatty = isatty
1193
1194 def isatty(self):
1195 return self._isatty
1196
1197 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1198 self.assertFalse(pair.isatty())
1199
1200 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1201 self.assertTrue(pair.isatty())
1202
1203 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1204 self.assertTrue(pair.isatty())
1205
1206 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1207 self.assertTrue(pair.isatty())
1208
1209class CBufferedRWPairTest(BufferedRWPairTest):
1210 tp = io.BufferedRWPair
1211
1212class PyBufferedRWPairTest(BufferedRWPairTest):
1213 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001214
1215
Antoine Pitrou19690592009-06-12 20:14:08 +00001216class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1217 read_mode = "rb+"
1218 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001219
Antoine Pitrou19690592009-06-12 20:14:08 +00001220 def test_constructor(self):
1221 BufferedReaderTest.test_constructor(self)
1222 BufferedWriterTest.test_constructor(self)
1223
1224 def test_read_and_write(self):
1225 raw = self.MockRawIO((b"asdf", b"ghjk"))
1226 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001227
1228 self.assertEqual(b"as", rw.read(2))
1229 rw.write(b"ddd")
1230 rw.write(b"eee")
1231 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001232 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001233 self.assertEquals(b"dddeee", raw._write_stack[0])
1234
Antoine Pitrou19690592009-06-12 20:14:08 +00001235 def test_seek_and_tell(self):
1236 raw = self.BytesIO(b"asdfghjkl")
1237 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001238
1239 self.assertEquals(b"as", rw.read(2))
1240 self.assertEquals(2, rw.tell())
1241 rw.seek(0, 0)
1242 self.assertEquals(b"asdf", rw.read(4))
1243
1244 rw.write(b"asdf")
1245 rw.seek(0, 0)
1246 self.assertEquals(b"asdfasdfl", rw.read())
1247 self.assertEquals(9, rw.tell())
1248 rw.seek(-4, 2)
1249 self.assertEquals(5, rw.tell())
1250 rw.seek(2, 1)
1251 self.assertEquals(7, rw.tell())
1252 self.assertEquals(b"fl", rw.read(11))
1253 self.assertRaises(TypeError, rw.seek, 0.0)
1254
Antoine Pitrou19690592009-06-12 20:14:08 +00001255 def check_flush_and_read(self, read_func):
1256 raw = self.BytesIO(b"abcdefghi")
1257 bufio = self.tp(raw)
1258
1259 self.assertEquals(b"ab", read_func(bufio, 2))
1260 bufio.write(b"12")
1261 self.assertEquals(b"ef", read_func(bufio, 2))
1262 self.assertEquals(6, bufio.tell())
1263 bufio.flush()
1264 self.assertEquals(6, bufio.tell())
1265 self.assertEquals(b"ghi", read_func(bufio))
1266 raw.seek(0, 0)
1267 raw.write(b"XYZ")
1268 # flush() resets the read buffer
1269 bufio.flush()
1270 bufio.seek(0, 0)
1271 self.assertEquals(b"XYZ", read_func(bufio, 3))
1272
1273 def test_flush_and_read(self):
1274 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1275
1276 def test_flush_and_readinto(self):
1277 def _readinto(bufio, n=-1):
1278 b = bytearray(n if n >= 0 else 9999)
1279 n = bufio.readinto(b)
1280 return bytes(b[:n])
1281 self.check_flush_and_read(_readinto)
1282
1283 def test_flush_and_peek(self):
1284 def _peek(bufio, n=-1):
1285 # This relies on the fact that the buffer can contain the whole
1286 # raw stream, otherwise peek() can return less.
1287 b = bufio.peek(n)
1288 if n != -1:
1289 b = b[:n]
1290 bufio.seek(len(b), 1)
1291 return b
1292 self.check_flush_and_read(_peek)
1293
1294 def test_flush_and_write(self):
1295 raw = self.BytesIO(b"abcdefghi")
1296 bufio = self.tp(raw)
1297
1298 bufio.write(b"123")
1299 bufio.flush()
1300 bufio.write(b"45")
1301 bufio.flush()
1302 bufio.seek(0, 0)
1303 self.assertEquals(b"12345fghi", raw.getvalue())
1304 self.assertEquals(b"12345fghi", bufio.read())
1305
1306 def test_threads(self):
1307 BufferedReaderTest.test_threads(self)
1308 BufferedWriterTest.test_threads(self)
1309
1310 def test_writes_and_peek(self):
1311 def _peek(bufio):
1312 bufio.peek(1)
1313 self.check_writes(_peek)
1314 def _peek(bufio):
1315 pos = bufio.tell()
1316 bufio.seek(-1, 1)
1317 bufio.peek(1)
1318 bufio.seek(pos, 0)
1319 self.check_writes(_peek)
1320
1321 def test_writes_and_reads(self):
1322 def _read(bufio):
1323 bufio.seek(-1, 1)
1324 bufio.read(1)
1325 self.check_writes(_read)
1326
1327 def test_writes_and_read1s(self):
1328 def _read1(bufio):
1329 bufio.seek(-1, 1)
1330 bufio.read1(1)
1331 self.check_writes(_read1)
1332
1333 def test_writes_and_readintos(self):
1334 def _read(bufio):
1335 bufio.seek(-1, 1)
1336 bufio.readinto(bytearray(1))
1337 self.check_writes(_read)
1338
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001339 def test_write_after_readahead(self):
1340 # Issue #6629: writing after the buffer was filled by readahead should
1341 # first rewind the raw stream.
1342 for overwrite_size in [1, 5]:
1343 raw = self.BytesIO(b"A" * 10)
1344 bufio = self.tp(raw, 4)
1345 # Trigger readahead
1346 self.assertEqual(bufio.read(1), b"A")
1347 self.assertEqual(bufio.tell(), 1)
1348 # Overwriting should rewind the raw stream if it needs so
1349 bufio.write(b"B" * overwrite_size)
1350 self.assertEqual(bufio.tell(), overwrite_size + 1)
1351 # If the write size was smaller than the buffer size, flush() and
1352 # check that rewind happens.
1353 bufio.flush()
1354 self.assertEqual(bufio.tell(), overwrite_size + 1)
1355 s = raw.getvalue()
1356 self.assertEqual(s,
1357 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1358
Antoine Pitrou19690592009-06-12 20:14:08 +00001359 def test_misbehaved_io(self):
1360 BufferedReaderTest.test_misbehaved_io(self)
1361 BufferedWriterTest.test_misbehaved_io(self)
1362
1363class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1364 tp = io.BufferedRandom
1365
1366 def test_constructor(self):
1367 BufferedRandomTest.test_constructor(self)
1368 # The allocation can succeed on 32-bit builds, e.g. with more
1369 # than 2GB RAM and a 64-bit kernel.
1370 if sys.maxsize > 0x7FFFFFFF:
1371 rawio = self.MockRawIO()
1372 bufio = self.tp(rawio)
1373 self.assertRaises((OverflowError, MemoryError, ValueError),
1374 bufio.__init__, rawio, sys.maxsize)
1375
1376 def test_garbage_collection(self):
1377 CBufferedReaderTest.test_garbage_collection(self)
1378 CBufferedWriterTest.test_garbage_collection(self)
1379
1380class PyBufferedRandomTest(BufferedRandomTest):
1381 tp = pyio.BufferedRandom
1382
1383
Christian Heimes1a6387e2008-03-26 12:49:49 +00001384# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1385# properties:
1386# - A single output character can correspond to many bytes of input.
1387# - The number of input bytes to complete the character can be
1388# undetermined until the last input byte is received.
1389# - The number of input bytes can vary depending on previous input.
1390# - A single input byte can correspond to many characters of output.
1391# - The number of output characters can be undetermined until the
1392# last input byte is received.
1393# - The number of output characters can vary depending on previous input.
1394
1395class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1396 """
1397 For testing seek/tell behavior with a stateful, buffering decoder.
1398
1399 Input is a sequence of words. Words may be fixed-length (length set
1400 by input) or variable-length (period-terminated). In variable-length
1401 mode, extra periods are ignored. Possible words are:
1402 - 'i' followed by a number sets the input length, I (maximum 99).
1403 When I is set to 0, words are space-terminated.
1404 - 'o' followed by a number sets the output length, O (maximum 99).
1405 - Any other word is converted into a word followed by a period on
1406 the output. The output word consists of the input word truncated
1407 or padded out with hyphens to make its length equal to O. If O
1408 is 0, the word is output verbatim without truncating or padding.
1409 I and O are initially set to 1. When I changes, any buffered input is
1410 re-scanned according to the new I. EOF also terminates the last word.
1411 """
1412
1413 def __init__(self, errors='strict'):
1414 codecs.IncrementalDecoder.__init__(self, errors)
1415 self.reset()
1416
1417 def __repr__(self):
1418 return '<SID %x>' % id(self)
1419
1420 def reset(self):
1421 self.i = 1
1422 self.o = 1
1423 self.buffer = bytearray()
1424
1425 def getstate(self):
1426 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1427 return bytes(self.buffer), i*100 + o
1428
1429 def setstate(self, state):
1430 buffer, io = state
1431 self.buffer = bytearray(buffer)
1432 i, o = divmod(io, 100)
1433 self.i, self.o = i ^ 1, o ^ 1
1434
1435 def decode(self, input, final=False):
1436 output = ''
1437 for b in input:
1438 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001439 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001440 if self.buffer:
1441 output += self.process_word()
1442 else:
1443 self.buffer.append(b)
1444 else: # fixed-length, terminate after self.i bytes
1445 self.buffer.append(b)
1446 if len(self.buffer) == self.i:
1447 output += self.process_word()
1448 if final and self.buffer: # EOF terminates the last word
1449 output += self.process_word()
1450 return output
1451
1452 def process_word(self):
1453 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001454 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001455 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001456 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001457 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1458 else:
1459 output = self.buffer.decode('ascii')
1460 if len(output) < self.o:
1461 output += '-'*self.o # pad out with hyphens
1462 if self.o:
1463 output = output[:self.o] # truncate to output length
1464 output += '.'
1465 self.buffer = bytearray()
1466 return output
1467
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001468 codecEnabled = False
1469
1470 @classmethod
1471 def lookupTestDecoder(cls, name):
1472 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001473 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001474 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001475 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001476 incrementalencoder=None,
1477 streamreader=None, streamwriter=None,
1478 incrementaldecoder=cls)
1479
1480# Register the previous decoder for testing.
1481# Disabled by default, tests will enable it.
1482codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1483
1484
Christian Heimes1a6387e2008-03-26 12:49:49 +00001485class StatefulIncrementalDecoderTest(unittest.TestCase):
1486 """
1487 Make sure the StatefulIncrementalDecoder actually works.
1488 """
1489
1490 test_cases = [
1491 # I=1, O=1 (fixed-length input == fixed-length output)
1492 (b'abcd', False, 'a.b.c.d.'),
1493 # I=0, O=0 (variable-length input, variable-length output)
1494 (b'oiabcd', True, 'abcd.'),
1495 # I=0, O=0 (should ignore extra periods)
1496 (b'oi...abcd...', True, 'abcd.'),
1497 # I=0, O=6 (variable-length input, fixed-length output)
1498 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1499 # I=2, O=6 (fixed-length input < fixed-length output)
1500 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1501 # I=6, O=3 (fixed-length input > fixed-length output)
1502 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1503 # I=0, then 3; O=29, then 15 (with longer output)
1504 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1505 'a----------------------------.' +
1506 'b----------------------------.' +
1507 'cde--------------------------.' +
1508 'abcdefghijabcde.' +
1509 'a.b------------.' +
1510 '.c.------------.' +
1511 'd.e------------.' +
1512 'k--------------.' +
1513 'l--------------.' +
1514 'm--------------.')
1515 ]
1516
Antoine Pitrou19690592009-06-12 20:14:08 +00001517 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001518 # Try a few one-shot test cases.
1519 for input, eof, output in self.test_cases:
1520 d = StatefulIncrementalDecoder()
1521 self.assertEquals(d.decode(input, eof), output)
1522
1523 # Also test an unfinished decode, followed by forcing EOF.
1524 d = StatefulIncrementalDecoder()
1525 self.assertEquals(d.decode(b'oiabcd'), '')
1526 self.assertEquals(d.decode(b'', 1), 'abcd.')
1527
1528class TextIOWrapperTest(unittest.TestCase):
1529
1530 def setUp(self):
1531 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1532 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001533 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001534
1535 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001536 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001537
Antoine Pitrou19690592009-06-12 20:14:08 +00001538 def test_constructor(self):
1539 r = self.BytesIO(b"\xc3\xa9\n\n")
1540 b = self.BufferedReader(r, 1000)
1541 t = self.TextIOWrapper(b)
1542 t.__init__(b, encoding="latin1", newline="\r\n")
1543 self.assertEquals(t.encoding, "latin1")
1544 self.assertEquals(t.line_buffering, False)
1545 t.__init__(b, encoding="utf8", line_buffering=True)
1546 self.assertEquals(t.encoding, "utf8")
1547 self.assertEquals(t.line_buffering, True)
1548 self.assertEquals("\xe9\n", t.readline())
1549 self.assertRaises(TypeError, t.__init__, b, newline=42)
1550 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1551
1552 def test_detach(self):
1553 r = self.BytesIO()
1554 b = self.BufferedWriter(r)
1555 t = self.TextIOWrapper(b)
1556 self.assertIs(t.detach(), b)
1557
1558 t = self.TextIOWrapper(b, encoding="ascii")
1559 t.write("howdy")
1560 self.assertFalse(r.getvalue())
1561 t.detach()
1562 self.assertEqual(r.getvalue(), b"howdy")
1563 self.assertRaises(ValueError, t.detach)
1564
1565 def test_repr(self):
1566 raw = self.BytesIO("hello".encode("utf-8"))
1567 b = self.BufferedReader(raw)
1568 t = self.TextIOWrapper(b, encoding="utf-8")
1569 modname = self.TextIOWrapper.__module__
1570 self.assertEqual(repr(t),
1571 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1572 raw.name = "dummy"
1573 self.assertEqual(repr(t),
1574 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1575 raw.name = b"dummy"
1576 self.assertEqual(repr(t),
1577 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1578
1579 def test_line_buffering(self):
1580 r = self.BytesIO()
1581 b = self.BufferedWriter(r, 1000)
1582 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1583 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001584 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001585 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001586 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001587 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001588 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1589
Antoine Pitrou19690592009-06-12 20:14:08 +00001590 def test_encoding(self):
1591 # Check the encoding attribute is always set, and valid
1592 b = self.BytesIO()
1593 t = self.TextIOWrapper(b, encoding="utf8")
1594 self.assertEqual(t.encoding, "utf8")
1595 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001596 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001597 codecs.lookup(t.encoding)
1598
1599 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001600 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001601 b = self.BytesIO(b"abc\n\xff\n")
1602 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001603 self.assertRaises(UnicodeError, t.read)
1604 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001605 b = self.BytesIO(b"abc\n\xff\n")
1606 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001607 self.assertRaises(UnicodeError, t.read)
1608 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001609 b = self.BytesIO(b"abc\n\xff\n")
1610 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001611 self.assertEquals(t.read(), "abc\n\n")
1612 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001613 b = self.BytesIO(b"abc\n\xff\n")
1614 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1615 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001616
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001618 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001619 b = self.BytesIO()
1620 t = self.TextIOWrapper(b, encoding="ascii")
1621 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001622 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001623 b = self.BytesIO()
1624 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1625 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001626 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001627 b = self.BytesIO()
1628 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001629 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001630 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001631 t.flush()
1632 self.assertEquals(b.getvalue(), b"abcdef\n")
1633 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001634 b = self.BytesIO()
1635 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001636 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001637 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001638 t.flush()
1639 self.assertEquals(b.getvalue(), b"abc?def\n")
1640
Antoine Pitrou19690592009-06-12 20:14:08 +00001641 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001642 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1643
1644 tests = [
1645 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1646 [ '', input_lines ],
1647 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1648 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1649 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1650 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001651 encodings = (
1652 'utf-8', 'latin-1',
1653 'utf-16', 'utf-16-le', 'utf-16-be',
1654 'utf-32', 'utf-32-le', 'utf-32-be',
1655 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001656
1657 # Try a range of buffer sizes to test the case where \r is the last
1658 # character in TextIOWrapper._pending_line.
1659 for encoding in encodings:
1660 # XXX: str.encode() should return bytes
1661 data = bytes(''.join(input_lines).encode(encoding))
1662 for do_reads in (False, True):
1663 for bufsize in range(1, 10):
1664 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001665 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1666 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667 encoding=encoding)
1668 if do_reads:
1669 got_lines = []
1670 while True:
1671 c2 = textio.read(2)
1672 if c2 == '':
1673 break
1674 self.assertEquals(len(c2), 2)
1675 got_lines.append(c2 + textio.readline())
1676 else:
1677 got_lines = list(textio)
1678
1679 for got_line, exp_line in zip(got_lines, exp_lines):
1680 self.assertEquals(got_line, exp_line)
1681 self.assertEquals(len(got_lines), len(exp_lines))
1682
Antoine Pitrou19690592009-06-12 20:14:08 +00001683 def test_newlines_input(self):
1684 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001685 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1686 for newline, expected in [
1687 (None, normalized.decode("ascii").splitlines(True)),
1688 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001689 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1690 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1691 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001692 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001693 buf = self.BytesIO(testdata)
1694 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001695 self.assertEquals(txt.readlines(), expected)
1696 txt.seek(0)
1697 self.assertEquals(txt.read(), "".join(expected))
1698
Antoine Pitrou19690592009-06-12 20:14:08 +00001699 def test_newlines_output(self):
1700 testdict = {
1701 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1702 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1703 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1704 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1705 }
1706 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1707 for newline, expected in tests:
1708 buf = self.BytesIO()
1709 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1710 txt.write("AAA\nB")
1711 txt.write("BB\nCCC\n")
1712 txt.write("X\rY\r\nZ")
1713 txt.flush()
1714 self.assertEquals(buf.closed, False)
1715 self.assertEquals(buf.getvalue(), expected)
1716
1717 def test_destructor(self):
1718 l = []
1719 base = self.BytesIO
1720 class MyBytesIO(base):
1721 def close(self):
1722 l.append(self.getvalue())
1723 base.close(self)
1724 b = MyBytesIO()
1725 t = self.TextIOWrapper(b, encoding="ascii")
1726 t.write("abc")
1727 del t
1728 support.gc_collect()
1729 self.assertEquals([b"abc"], l)
1730
1731 def test_override_destructor(self):
1732 record = []
1733 class MyTextIO(self.TextIOWrapper):
1734 def __del__(self):
1735 record.append(1)
1736 try:
1737 f = super(MyTextIO, self).__del__
1738 except AttributeError:
1739 pass
1740 else:
1741 f()
1742 def close(self):
1743 record.append(2)
1744 super(MyTextIO, self).close()
1745 def flush(self):
1746 record.append(3)
1747 super(MyTextIO, self).flush()
1748 b = self.BytesIO()
1749 t = MyTextIO(b, encoding="ascii")
1750 del t
1751 support.gc_collect()
1752 self.assertEqual(record, [1, 2, 3])
1753
1754 def test_error_through_destructor(self):
1755 # Test that the exception state is not modified by a destructor,
1756 # even if close() fails.
1757 rawio = self.CloseFailureIO()
1758 def f():
1759 self.TextIOWrapper(rawio).xyzzy
1760 with support.captured_output("stderr") as s:
1761 self.assertRaises(AttributeError, f)
1762 s = s.getvalue().strip()
1763 if s:
1764 # The destructor *may* have printed an unraisable error, check it
1765 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001766 self.assertTrue(s.startswith("Exception IOError: "), s)
1767 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001768
1769 # Systematic tests of the text I/O API
1770
Antoine Pitrou19690592009-06-12 20:14:08 +00001771 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001772 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1773 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001774 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001775 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001776 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001777 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001778 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001779 f._CHUNK_SIZE = chunksize
1780 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001781 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001782 cookie = f.tell()
1783 self.assertEquals(f.seek(0), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001784 self.assertEquals(f.read(2), "ab")
1785 self.assertEquals(f.read(1), "c")
1786 self.assertEquals(f.read(1), "")
1787 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001788 self.assertEquals(f.tell(), cookie)
1789 self.assertEquals(f.seek(0), 0)
1790 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001791 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001792 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001793 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001794 if enc.startswith("utf"):
1795 self.multi_line_test(f, enc)
1796 f.close()
1797
1798 def multi_line_test(self, f, enc):
1799 f.seek(0)
1800 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001801 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001802 wlines = []
1803 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1804 chars = []
1805 for i in range(size):
1806 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001807 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001808 wlines.append((f.tell(), line))
1809 f.write(line)
1810 f.seek(0)
1811 rlines = []
1812 while True:
1813 pos = f.tell()
1814 line = f.readline()
1815 if not line:
1816 break
1817 rlines.append((pos, line))
1818 self.assertEquals(rlines, wlines)
1819
Antoine Pitrou19690592009-06-12 20:14:08 +00001820 def test_telling(self):
1821 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001822 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001823 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001824 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001825 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001826 p2 = f.tell()
1827 f.seek(0)
1828 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001829 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001830 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001831 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001832 self.assertEquals(f.tell(), p2)
1833 f.seek(0)
1834 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001835 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001836 self.assertRaises(IOError, f.tell)
1837 self.assertEquals(f.tell(), p2)
1838 f.close()
1839
Antoine Pitrou19690592009-06-12 20:14:08 +00001840 def test_seeking(self):
1841 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842 prefix_size = chunk_size - 2
1843 u_prefix = "a" * prefix_size
1844 prefix = bytes(u_prefix.encode("utf-8"))
1845 self.assertEquals(len(u_prefix), len(prefix))
1846 u_suffix = "\u8888\n"
1847 suffix = bytes(u_suffix.encode("utf-8"))
1848 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001849 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001850 f.write(line*2)
1851 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001852 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001853 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001854 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001855 self.assertEquals(f.tell(), prefix_size)
1856 self.assertEquals(f.readline(), u_suffix)
1857
Antoine Pitrou19690592009-06-12 20:14:08 +00001858 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 # Regression test for a specific bug
1860 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001861 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001862 f.write(data)
1863 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001864 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001865 f._CHUNK_SIZE # Just test that it exists
1866 f._CHUNK_SIZE = 2
1867 f.readline()
1868 f.tell()
1869
Antoine Pitrou19690592009-06-12 20:14:08 +00001870 def test_seek_and_tell(self):
1871 #Test seek/tell using the StatefulIncrementalDecoder.
1872 # Make test faster by doing smaller seeks
1873 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001874
Antoine Pitrou19690592009-06-12 20:14:08 +00001875 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876 """Tell/seek to various points within a data stream and ensure
1877 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001878 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001879 f.write(data)
1880 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001881 f = self.open(support.TESTFN, encoding='test_decoder')
1882 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001883 decoded = f.read()
1884 f.close()
1885
1886 for i in range(min_pos, len(decoded) + 1): # seek positions
1887 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 self.assertEquals(f.read(i), decoded[:i])
1890 cookie = f.tell()
1891 self.assertEquals(f.read(j), decoded[i:i + j])
1892 f.seek(cookie)
1893 self.assertEquals(f.read(), decoded[i:])
1894 f.close()
1895
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001896 # Enable the test decoder.
1897 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898
1899 # Run the tests.
1900 try:
1901 # Try each test case.
1902 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001904
1905 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001906 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1907 offset = CHUNK_SIZE - len(input)//2
1908 prefix = b'.'*offset
1909 # Don't bother seeking into the prefix (takes too long).
1910 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001911 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912
1913 # Ensure our test decoder won't interfere with subsequent tests.
1914 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001915 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001916
Antoine Pitrou19690592009-06-12 20:14:08 +00001917 def test_encoded_writes(self):
1918 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 tests = ("utf-16",
1920 "utf-16-le",
1921 "utf-16-be",
1922 "utf-32",
1923 "utf-32-le",
1924 "utf-32-be")
1925 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001926 buf = self.BytesIO()
1927 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001928 # Check if the BOM is written only once (see issue1753).
1929 f.write(data)
1930 f.write(data)
1931 f.seek(0)
1932 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001933 f.seek(0)
1934 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1936
Antoine Pitrou19690592009-06-12 20:14:08 +00001937 def test_unreadable(self):
1938 class UnReadable(self.BytesIO):
1939 def readable(self):
1940 return False
1941 txt = self.TextIOWrapper(UnReadable())
1942 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943
Antoine Pitrou19690592009-06-12 20:14:08 +00001944 def test_read_one_by_one(self):
1945 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001946 reads = ""
1947 while True:
1948 c = txt.read(1)
1949 if not c:
1950 break
1951 reads += c
1952 self.assertEquals(reads, "AA\nBB")
1953
1954 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001955 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001956 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001957 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001958 reads = ""
1959 while True:
1960 c = txt.read(128)
1961 if not c:
1962 break
1963 reads += c
1964 self.assertEquals(reads, "A"*127+"\nB")
1965
1966 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001967 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968
1969 # read one char at a time
1970 reads = ""
1971 while True:
1972 c = txt.read(1)
1973 if not c:
1974 break
1975 reads += c
1976 self.assertEquals(reads, self.normalized)
1977
1978 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001979 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980 txt._CHUNK_SIZE = 4
1981
1982 reads = ""
1983 while True:
1984 c = txt.read(4)
1985 if not c:
1986 break
1987 reads += c
1988 self.assertEquals(reads, self.normalized)
1989
1990 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001991 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 txt._CHUNK_SIZE = 4
1993
1994 reads = txt.read(4)
1995 reads += txt.read(4)
1996 reads += txt.readline()
1997 reads += txt.readline()
1998 reads += txt.readline()
1999 self.assertEquals(reads, self.normalized)
2000
2001 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002003 txt._CHUNK_SIZE = 4
2004
2005 reads = txt.read(4)
2006 reads += txt.read()
2007 self.assertEquals(reads, self.normalized)
2008
2009 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002010 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011 txt._CHUNK_SIZE = 4
2012
2013 reads = txt.read(4)
2014 pos = txt.tell()
2015 txt.seek(0)
2016 txt.seek(pos)
2017 self.assertEquals(txt.read(4), "BBB\n")
2018
2019 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 buffer = self.BytesIO(self.testdata)
2021 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002022
2023 self.assertEqual(buffer.seekable(), txt.seekable())
2024
Antoine Pitrou19690592009-06-12 20:14:08 +00002025 @unittest.skip("Issue #6213 with incremental encoders")
2026 def test_append_bom(self):
2027 # The BOM is not written again when appending to a non-empty file
2028 filename = support.TESTFN
2029 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2030 with self.open(filename, 'w', encoding=charset) as f:
2031 f.write('aaa')
2032 pos = f.tell()
2033 with self.open(filename, 'rb') as f:
2034 self.assertEquals(f.read(), 'aaa'.encode(charset))
2035
2036 with self.open(filename, 'a', encoding=charset) as f:
2037 f.write('xxx')
2038 with self.open(filename, 'rb') as f:
2039 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2040
2041 @unittest.skip("Issue #6213 with incremental encoders")
2042 def test_seek_bom(self):
2043 # Same test, but when seeking manually
2044 filename = support.TESTFN
2045 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2046 with self.open(filename, 'w', encoding=charset) as f:
2047 f.write('aaa')
2048 pos = f.tell()
2049 with self.open(filename, 'r+', encoding=charset) as f:
2050 f.seek(pos)
2051 f.write('zzz')
2052 f.seek(0)
2053 f.write('bbb')
2054 with self.open(filename, 'rb') as f:
2055 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2056
2057 def test_errors_property(self):
2058 with self.open(support.TESTFN, "w") as f:
2059 self.assertEqual(f.errors, "strict")
2060 with self.open(support.TESTFN, "w", errors="replace") as f:
2061 self.assertEqual(f.errors, "replace")
2062
2063
2064class CTextIOWrapperTest(TextIOWrapperTest):
2065
2066 def test_initialization(self):
2067 r = self.BytesIO(b"\xc3\xa9\n\n")
2068 b = self.BufferedReader(r, 1000)
2069 t = self.TextIOWrapper(b)
2070 self.assertRaises(TypeError, t.__init__, b, newline=42)
2071 self.assertRaises(ValueError, t.read)
2072 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2073 self.assertRaises(ValueError, t.read)
2074
2075 def test_garbage_collection(self):
2076 # C TextIOWrapper objects are collected, and collecting them flushes
2077 # all data to disk.
2078 # The Python version has __del__, so it ends in gc.garbage instead.
2079 rawio = io.FileIO(support.TESTFN, "wb")
2080 b = self.BufferedWriter(rawio)
2081 t = self.TextIOWrapper(b, encoding="ascii")
2082 t.write("456def")
2083 t.x = t
2084 wr = weakref.ref(t)
2085 del t
2086 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002087 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002088 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002089 self.assertEqual(f.read(), b"456def")
2090
2091class PyTextIOWrapperTest(TextIOWrapperTest):
2092 pass
2093
2094
2095class IncrementalNewlineDecoderTest(unittest.TestCase):
2096
2097 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002098 # UTF-8 specific tests for a newline decoder
2099 def _check_decode(b, s, **kwargs):
2100 # We exercise getstate() / setstate() as well as decode()
2101 state = decoder.getstate()
2102 self.assertEquals(decoder.decode(b, **kwargs), s)
2103 decoder.setstate(state)
2104 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002105
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002106 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002107
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002108 _check_decode(b'\xe8', "")
2109 _check_decode(b'\xa2', "")
2110 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002111
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002112 _check_decode(b'\xe8', "")
2113 _check_decode(b'\xa2', "")
2114 _check_decode(b'\x88', "\u8888")
2115
2116 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002117 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2118
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002119 decoder.reset()
2120 _check_decode(b'\n', "\n")
2121 _check_decode(b'\r', "")
2122 _check_decode(b'', "\n", final=True)
2123 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002124
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002125 _check_decode(b'\r', "")
2126 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002127
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002128 _check_decode(b'\r\r\n', "\n\n")
2129 _check_decode(b'\r', "")
2130 _check_decode(b'\r', "\n")
2131 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002132
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002133 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2134 _check_decode(b'\xe8\xa2\x88', "\u8888")
2135 _check_decode(b'\n', "\n")
2136 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2137 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002138
Antoine Pitrou19690592009-06-12 20:14:08 +00002139 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002140 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002141 if encoding is not None:
2142 encoder = codecs.getincrementalencoder(encoding)()
2143 def _decode_bytewise(s):
2144 # Decode one byte at a time
2145 for b in encoder.encode(s):
2146 result.append(decoder.decode(b))
2147 else:
2148 encoder = None
2149 def _decode_bytewise(s):
2150 # Decode one char at a time
2151 for c in s:
2152 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002153 self.assertEquals(decoder.newlines, None)
2154 _decode_bytewise("abc\n\r")
2155 self.assertEquals(decoder.newlines, '\n')
2156 _decode_bytewise("\nabc")
2157 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2158 _decode_bytewise("abc\r")
2159 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2160 _decode_bytewise("abc")
2161 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2162 _decode_bytewise("abc\r")
2163 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2164 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002165 input = "abc"
2166 if encoder is not None:
2167 encoder.reset()
2168 input = encoder.encode(input)
2169 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002170 self.assertEquals(decoder.newlines, None)
2171
2172 def test_newline_decoder(self):
2173 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002174 # None meaning the IncrementalNewlineDecoder takes unicode input
2175 # rather than bytes input
2176 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002177 'utf-16', 'utf-16-le', 'utf-16-be',
2178 'utf-32', 'utf-32-le', 'utf-32-be',
2179 )
2180 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002181 decoder = enc and codecs.getincrementaldecoder(enc)()
2182 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2183 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002184 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002185 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2186 self.check_newline_decoding_utf8(decoder)
2187
2188 def test_newline_bytes(self):
2189 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2190 def _check(dec):
2191 self.assertEquals(dec.newlines, None)
2192 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2193 self.assertEquals(dec.newlines, None)
2194 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2195 self.assertEquals(dec.newlines, None)
2196 dec = self.IncrementalNewlineDecoder(None, translate=False)
2197 _check(dec)
2198 dec = self.IncrementalNewlineDecoder(None, translate=True)
2199 _check(dec)
2200
2201class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2202 pass
2203
2204class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2205 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002206
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207
2208# XXX Tests for open()
2209
2210class MiscIOTest(unittest.TestCase):
2211
Benjamin Petersonad100c32008-11-20 22:06:22 +00002212 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002213 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002214
Antoine Pitrou19690592009-06-12 20:14:08 +00002215 def test___all__(self):
2216 for name in self.io.__all__:
2217 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002218 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002219 if name == "open":
2220 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002221 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002222 self.assertTrue(issubclass(obj, Exception), name)
2223 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002224 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002225
Benjamin Petersonad100c32008-11-20 22:06:22 +00002226 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002227 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002228 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002229 f.close()
2230
Antoine Pitrou19690592009-06-12 20:14:08 +00002231 f = self.open(support.TESTFN, "U")
2232 self.assertEquals(f.name, support.TESTFN)
2233 self.assertEquals(f.buffer.name, support.TESTFN)
2234 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002235 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002236 self.assertEquals(f.buffer.mode, "rb")
2237 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002238 f.close()
2239
Antoine Pitrou19690592009-06-12 20:14:08 +00002240 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002241 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002242 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2243 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002244
Antoine Pitrou19690592009-06-12 20:14:08 +00002245 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002246 self.assertEquals(g.mode, "wb")
2247 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002248 self.assertEquals(g.name, f.fileno())
2249 self.assertEquals(g.raw.name, f.fileno())
2250 f.close()
2251 g.close()
2252
Antoine Pitrou19690592009-06-12 20:14:08 +00002253 def test_io_after_close(self):
2254 for kwargs in [
2255 {"mode": "w"},
2256 {"mode": "wb"},
2257 {"mode": "w", "buffering": 1},
2258 {"mode": "w", "buffering": 2},
2259 {"mode": "wb", "buffering": 0},
2260 {"mode": "r"},
2261 {"mode": "rb"},
2262 {"mode": "r", "buffering": 1},
2263 {"mode": "r", "buffering": 2},
2264 {"mode": "rb", "buffering": 0},
2265 {"mode": "w+"},
2266 {"mode": "w+b"},
2267 {"mode": "w+", "buffering": 1},
2268 {"mode": "w+", "buffering": 2},
2269 {"mode": "w+b", "buffering": 0},
2270 ]:
2271 f = self.open(support.TESTFN, **kwargs)
2272 f.close()
2273 self.assertRaises(ValueError, f.flush)
2274 self.assertRaises(ValueError, f.fileno)
2275 self.assertRaises(ValueError, f.isatty)
2276 self.assertRaises(ValueError, f.__iter__)
2277 if hasattr(f, "peek"):
2278 self.assertRaises(ValueError, f.peek, 1)
2279 self.assertRaises(ValueError, f.read)
2280 if hasattr(f, "read1"):
2281 self.assertRaises(ValueError, f.read1, 1024)
2282 if hasattr(f, "readinto"):
2283 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2284 self.assertRaises(ValueError, f.readline)
2285 self.assertRaises(ValueError, f.readlines)
2286 self.assertRaises(ValueError, f.seek, 0)
2287 self.assertRaises(ValueError, f.tell)
2288 self.assertRaises(ValueError, f.truncate)
2289 self.assertRaises(ValueError, f.write,
2290 b"" if "b" in kwargs['mode'] else "")
2291 self.assertRaises(ValueError, f.writelines, [])
2292 self.assertRaises(ValueError, next, f)
2293
2294 def test_blockingioerror(self):
2295 # Various BlockingIOError issues
2296 self.assertRaises(TypeError, self.BlockingIOError)
2297 self.assertRaises(TypeError, self.BlockingIOError, 1)
2298 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2299 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2300 b = self.BlockingIOError(1, "")
2301 self.assertEqual(b.characters_written, 0)
2302 class C(unicode):
2303 pass
2304 c = C("")
2305 b = self.BlockingIOError(1, c)
2306 c.b = b
2307 b.c = c
2308 wr = weakref.ref(c)
2309 del c, b
2310 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002311 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002312
2313 def test_abcs(self):
2314 # Test the visible base classes are ABCs.
2315 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2316 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2317 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2318 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2319
2320 def _check_abc_inheritance(self, abcmodule):
2321 with self.open(support.TESTFN, "wb", buffering=0) as f:
2322 self.assertTrue(isinstance(f, abcmodule.IOBase))
2323 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2324 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2325 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2326 with self.open(support.TESTFN, "wb") as f:
2327 self.assertTrue(isinstance(f, abcmodule.IOBase))
2328 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2329 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2330 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2331 with self.open(support.TESTFN, "w") as f:
2332 self.assertTrue(isinstance(f, abcmodule.IOBase))
2333 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2334 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2335 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2336
2337 def test_abc_inheritance(self):
2338 # Test implementations inherit from their respective ABCs
2339 self._check_abc_inheritance(self)
2340
2341 def test_abc_inheritance_official(self):
2342 # Test implementations inherit from the official ABCs of the
2343 # baseline "io" module.
2344 self._check_abc_inheritance(io)
2345
2346class CMiscIOTest(MiscIOTest):
2347 io = io
2348
2349class PyMiscIOTest(MiscIOTest):
2350 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002351
Christian Heimes1a6387e2008-03-26 12:49:49 +00002352def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 tests = (CIOTest, PyIOTest,
2354 CBufferedReaderTest, PyBufferedReaderTest,
2355 CBufferedWriterTest, PyBufferedWriterTest,
2356 CBufferedRWPairTest, PyBufferedRWPairTest,
2357 CBufferedRandomTest, PyBufferedRandomTest,
2358 StatefulIncrementalDecoderTest,
2359 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2360 CTextIOWrapperTest, PyTextIOWrapperTest,
2361 CMiscIOTest, PyMiscIOTest,
2362 )
2363
2364 # Put the namespaces of the IO module we are testing and some useful mock
2365 # classes in the __dict__ of each test.
2366 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2367 MockNonBlockWriterIO)
2368 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2369 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2370 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2371 globs = globals()
2372 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2373 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2374 # Avoid turning open into a bound method.
2375 py_io_ns["open"] = pyio.OpenWrapper
2376 for test in tests:
2377 if test.__name__.startswith("C"):
2378 for name, obj in c_io_ns.items():
2379 setattr(test, name, obj)
2380 elif test.__name__.startswith("Py"):
2381 for name, obj in py_io_ns.items():
2382 setattr(test, name, obj)
2383
2384 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002385
2386if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002387 test_main()