blob: 6f136f242a6145e0201c1f9fcd301e21432cf234 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import warnings
33import weakref
34import gc
35import abc
36from itertools import chain, cycle, count
37from collections import deque
38from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000039
40import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
43
44__metaclass__ = type
45bytes = support.py3k_bytes
46
47def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io.open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000051
52
Antoine Pitrou19690592009-06-12 20:14:08 +000053class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000058 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000061 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000062 try:
63 return self._read_stack.pop(0)
64 except:
65 return b""
66
67 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000068 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000069 return len(b)
70
71 def writable(self):
72 return True
73
74 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
80 def seekable(self):
81 return True
82
83 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000084 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000085
86 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
112class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
114
115class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000117
118
Antoine Pitrou19690592009-06-12 20:14:08 +0000119class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return MockRawIO.write(self, b) * 2
122
123 def read(self, n=None):
124 return MockRawIO.read(self, n) * 2
125
126 def seek(self, pos, whence):
127 return -123
128
129 def tell(self):
130 return -456
131
132 def readinto(self, buf):
133 MockRawIO.readinto(self, buf)
134 return len(buf) * 5
135
136class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
138
139class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
141
142
143class CloseFailureIO(MockRawIO):
144 closed = 0
145
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
150
151class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
153
154class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
156
157
158class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000159
160 def __init__(self, data):
161 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000162 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163
164 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000165 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000166 self.read_history.append(None if res is None else len(res))
167 return res
168
Antoine Pitrou19690592009-06-12 20:14:08 +0000169 def readinto(self, b):
170 res = super(MockFileIO, self).readinto(b)
171 self.read_history.append(res)
172 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000173
Antoine Pitrou19690592009-06-12 20:14:08 +0000174class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
Antoine Pitrou19690592009-06-12 20:14:08 +0000177class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
179
180
181class MockNonBlockWriterIO:
182
183 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
Antoine Pitrou19690592009-06-12 20:14:08 +0000187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
191
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
195
196 def readable(self):
197 return True
198
199 def seekable(self):
200 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201
202 def writable(self):
203 return True
204
Antoine Pitrou19690592009-06-12 20:14:08 +0000205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
219
220class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
222
223class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
225
Christian Heimes1a6387e2008-03-26 12:49:49 +0000226
227class IOTest(unittest.TestCase):
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def setUp(self):
230 support.unlink(support.TESTFN)
231
Christian Heimes1a6387e2008-03-26 12:49:49 +0000232 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000233 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000234
235 def write_ops(self, f):
236 self.assertEqual(f.write(b"blah."), 5)
237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
239 self.assertEqual(f.tell(), 6)
240 self.assertEqual(f.seek(-1, 1), 5)
241 self.assertEqual(f.tell(), 5)
242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"h"), 1)
245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
247 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000248 self.assertEqual(f.tell(), 12)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000249 self.assertRaises(TypeError, f.seek, 0.0)
250
251 def read_ops(self, f, buffered=False):
252 data = f.read(5)
253 self.assertEqual(data, b"hello")
254 data = bytearray(data)
255 self.assertEqual(f.readinto(data), 5)
256 self.assertEqual(data, b" worl")
257 self.assertEqual(f.readinto(data), 2)
258 self.assertEqual(len(data), 5)
259 self.assertEqual(data[:2], b"d\n")
260 self.assertEqual(f.seek(0), 0)
261 self.assertEqual(f.read(20), b"hello world\n")
262 self.assertEqual(f.read(1), b"")
263 self.assertEqual(f.readinto(bytearray(b"x")), 0)
264 self.assertEqual(f.seek(-6, 2), 6)
265 self.assertEqual(f.read(5), b"world")
266 self.assertEqual(f.read(0), b"")
267 self.assertEqual(f.readinto(bytearray()), 0)
268 self.assertEqual(f.seek(-6, 1), 5)
269 self.assertEqual(f.read(5), b" worl")
270 self.assertEqual(f.tell(), 10)
271 self.assertRaises(TypeError, f.seek, 0.0)
272 if buffered:
273 f.seek(0)
274 self.assertEqual(f.read(), b"hello world\n")
275 f.seek(6)
276 self.assertEqual(f.read(), b"world\n")
277 self.assertEqual(f.read(), b"")
278
279 LARGE = 2**31
280
281 def large_file_ops(self, f):
282 assert f.readable()
283 assert f.writable()
284 self.assertEqual(f.seek(self.LARGE), self.LARGE)
285 self.assertEqual(f.tell(), self.LARGE)
286 self.assertEqual(f.write(b"xxx"), 3)
287 self.assertEqual(f.tell(), self.LARGE + 3)
288 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
289 self.assertEqual(f.truncate(), self.LARGE + 2)
290 self.assertEqual(f.tell(), self.LARGE + 2)
291 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
292 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti1aed6242008-05-09 21:49:43 +0000293 self.assertEqual(f.tell(), self.LARGE + 1)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000294 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
295 self.assertEqual(f.seek(-1, 2), self.LARGE)
296 self.assertEqual(f.read(2), b"x")
297
Antoine Pitrou19690592009-06-12 20:14:08 +0000298 def test_invalid_operations(self):
299 # Try writing on a file opened in read mode and vice-versa.
300 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000301 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000302 self.assertRaises(IOError, fp.read)
303 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000304 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000305 self.assertRaises(IOError, fp.write, b"blah")
306 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000307 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000308 self.assertRaises(IOError, fp.write, "blah")
309 self.assertRaises(IOError, fp.writelines, ["blah\n"])
310
Christian Heimes1a6387e2008-03-26 12:49:49 +0000311 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 with self.open(support.TESTFN, "wb", buffering=0) as f:
313 self.assertEqual(f.readable(), False)
314 self.assertEqual(f.writable(), True)
315 self.assertEqual(f.seekable(), True)
316 self.write_ops(f)
317 with self.open(support.TESTFN, "rb", buffering=0) as f:
318 self.assertEqual(f.readable(), True)
319 self.assertEqual(f.writable(), False)
320 self.assertEqual(f.seekable(), True)
321 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000322
323 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000324 with self.open(support.TESTFN, "wb") as f:
325 self.assertEqual(f.readable(), False)
326 self.assertEqual(f.writable(), True)
327 self.assertEqual(f.seekable(), True)
328 self.write_ops(f)
329 with self.open(support.TESTFN, "rb") as f:
330 self.assertEqual(f.readable(), True)
331 self.assertEqual(f.writable(), False)
332 self.assertEqual(f.seekable(), True)
333 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000334
335 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000336 with self.open(support.TESTFN, "wb") as f:
337 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
338 with self.open(support.TESTFN, "rb") as f:
339 self.assertEqual(f.readline(), b"abc\n")
340 self.assertEqual(f.readline(10), b"def\n")
341 self.assertEqual(f.readline(2), b"xy")
342 self.assertEqual(f.readline(4), b"zzy\n")
343 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000344 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000345 self.assertRaises(TypeError, f.readline, 5.3)
346 with self.open(support.TESTFN, "r") as f:
347 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000348
349 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000350 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000351 self.write_ops(f)
352 data = f.getvalue()
353 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000354 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355 self.read_ops(f, True)
356
357 def test_large_file_ops(self):
358 # On Windows and Mac OSX this test comsumes large resources; It takes
359 # a long time to build the >2GB file and takes >2GB of disk space
360 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000361 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
362 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000363 print("\nTesting large file ops skipped on %s." % sys.platform,
364 file=sys.stderr)
365 print("It requires %d bytes and a long time." % self.LARGE,
366 file=sys.stderr)
367 print("Use 'regrtest.py -u largefile test_io' to run it.",
368 file=sys.stderr)
369 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000370 with self.open(support.TESTFN, "w+b", 0) as f:
371 self.large_file_ops(f)
372 with self.open(support.TESTFN, "w+b") as f:
373 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000374
375 def test_with_open(self):
376 for bufsize in (0, 1, 100):
377 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000379 f.write(b"xxx")
380 self.assertEqual(f.closed, True)
381 f = None
382 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Senthil Kumarance8e33a2010-01-08 19:04:16 +0000384 1/0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000385 except ZeroDivisionError:
386 self.assertEqual(f.closed, True)
387 else:
Senthil Kumarance8e33a2010-01-08 19:04:16 +0000388 self.fail("1/0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389
Antoine Pitroue741cc62009-01-21 00:45:36 +0000390 # issue 5008
391 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000392 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000393 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000394 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000395 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000397 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000398 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000399 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400
Christian Heimes1a6387e2008-03-26 12:49:49 +0000401 def test_destructor(self):
402 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000404 def __del__(self):
405 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000406 try:
407 f = super(MyFileIO, self).__del__
408 except AttributeError:
409 pass
410 else:
411 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000412 def close(self):
413 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000414 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000415 def flush(self):
416 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000417 super(MyFileIO, self).flush()
418 f = MyFileIO(support.TESTFN, "wb")
419 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000420 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 support.gc_collect()
422 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000423 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 self.assertEqual(f.read(), b"xxx")
425
426 def _check_base_destructor(self, base):
427 record = []
428 class MyIO(base):
429 def __init__(self):
430 # This exercises the availability of attributes on object
431 # destruction.
432 # (in the C version, close() is called by the tp_dealloc
433 # function, not by __del__)
434 self.on_del = 1
435 self.on_close = 2
436 self.on_flush = 3
437 def __del__(self):
438 record.append(self.on_del)
439 try:
440 f = super(MyIO, self).__del__
441 except AttributeError:
442 pass
443 else:
444 f()
445 def close(self):
446 record.append(self.on_close)
447 super(MyIO, self).close()
448 def flush(self):
449 record.append(self.on_flush)
450 super(MyIO, self).flush()
451 f = MyIO()
452 del f
453 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000454 self.assertEqual(record, [1, 2, 3])
455
Antoine Pitrou19690592009-06-12 20:14:08 +0000456 def test_IOBase_destructor(self):
457 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000458
Antoine Pitrou19690592009-06-12 20:14:08 +0000459 def test_RawIOBase_destructor(self):
460 self._check_base_destructor(self.RawIOBase)
461
462 def test_BufferedIOBase_destructor(self):
463 self._check_base_destructor(self.BufferedIOBase)
464
465 def test_TextIOBase_destructor(self):
466 self._check_base_destructor(self.TextIOBase)
467
468 def test_close_flushes(self):
469 with self.open(support.TESTFN, "wb") as f:
470 f.write(b"xxx")
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
473
474 def test_array_writes(self):
475 a = array.array(b'i', range(10))
476 n = len(a.tostring())
477 with self.open(support.TESTFN, "wb", 0) as f:
478 self.assertEqual(f.write(a), n)
479 with self.open(support.TESTFN, "wb") as f:
480 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000481
482 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000483 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000484 closefd=False)
485
Antoine Pitrou19690592009-06-12 20:14:08 +0000486 def test_read_closed(self):
487 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000488 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 with self.open(support.TESTFN, "r") as f:
490 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000491 self.assertEqual(file.read(), "egg\n")
492 file.seek(0)
493 file.close()
494 self.assertRaises(ValueError, file.read)
495
496 def test_no_closefd_with_filename(self):
497 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000498 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000499
500 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000501 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000502 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000503 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000504 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000505 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000506 self.assertEqual(file.buffer.raw.closefd, False)
507
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 def test_garbage_collection(self):
509 # FileIO objects are collected, and collecting them flushes
510 # all data to disk.
511 f = self.FileIO(support.TESTFN, "wb")
512 f.write(b"abcxxx")
513 f.f = f
514 wr = weakref.ref(f)
515 del f
516 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000517 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000518 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000519 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000520
Antoine Pitrou19690592009-06-12 20:14:08 +0000521 def test_unbounded_file(self):
522 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
523 zero = "/dev/zero"
524 if not os.path.exists(zero):
525 self.skipTest("{0} does not exist".format(zero))
526 if sys.maxsize > 0x7FFFFFFF:
527 self.skipTest("test can only run in a 32-bit address space")
528 if support.real_max_memuse < support._2G:
529 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000530 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000531 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000532 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000533 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000534 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000536
Antoine Pitrou19690592009-06-12 20:14:08 +0000537class CIOTest(IOTest):
538 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000539
Antoine Pitrou19690592009-06-12 20:14:08 +0000540class PyIOTest(IOTest):
541 test_array_writes = unittest.skip(
542 "len(array.array) returns number of elements rather than bytelength"
543 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000544
545
Antoine Pitrou19690592009-06-12 20:14:08 +0000546class CommonBufferedTests:
547 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
548
549 def test_detach(self):
550 raw = self.MockRawIO()
551 buf = self.tp(raw)
552 self.assertIs(buf.detach(), raw)
553 self.assertRaises(ValueError, buf.detach)
554
555 def test_fileno(self):
556 rawio = self.MockRawIO()
557 bufio = self.tp(rawio)
558
559 self.assertEquals(42, bufio.fileno())
560
561 def test_no_fileno(self):
562 # XXX will we always have fileno() function? If so, kill
563 # this test. Else, write it.
564 pass
565
566 def test_invalid_args(self):
567 rawio = self.MockRawIO()
568 bufio = self.tp(rawio)
569 # Invalid whence
570 self.assertRaises(ValueError, bufio.seek, 0, -1)
571 self.assertRaises(ValueError, bufio.seek, 0, 3)
572
573 def test_override_destructor(self):
574 tp = self.tp
575 record = []
576 class MyBufferedIO(tp):
577 def __del__(self):
578 record.append(1)
579 try:
580 f = super(MyBufferedIO, self).__del__
581 except AttributeError:
582 pass
583 else:
584 f()
585 def close(self):
586 record.append(2)
587 super(MyBufferedIO, self).close()
588 def flush(self):
589 record.append(3)
590 super(MyBufferedIO, self).flush()
591 rawio = self.MockRawIO()
592 bufio = MyBufferedIO(rawio)
593 writable = bufio.writable()
594 del bufio
595 support.gc_collect()
596 if writable:
597 self.assertEqual(record, [1, 2, 3])
598 else:
599 self.assertEqual(record, [1, 2])
600
601 def test_context_manager(self):
602 # Test usability as a context manager
603 rawio = self.MockRawIO()
604 bufio = self.tp(rawio)
605 def _with():
606 with bufio:
607 pass
608 _with()
609 # bufio should now be closed, and using it a second time should raise
610 # a ValueError.
611 self.assertRaises(ValueError, _with)
612
613 def test_error_through_destructor(self):
614 # Test that the exception state is not modified by a destructor,
615 # even if close() fails.
616 rawio = self.CloseFailureIO()
617 def f():
618 self.tp(rawio).xyzzy
619 with support.captured_output("stderr") as s:
620 self.assertRaises(AttributeError, f)
621 s = s.getvalue().strip()
622 if s:
623 # The destructor *may* have printed an unraisable error, check it
624 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000625 self.assertTrue(s.startswith("Exception IOError: "), s)
626 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000627
628 def test_repr(self):
629 raw = self.MockRawIO()
630 b = self.tp(raw)
631 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
632 self.assertEqual(repr(b), "<%s>" % clsname)
633 raw.name = "dummy"
634 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
635 raw.name = b"dummy"
636 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000637
638
Antoine Pitrou19690592009-06-12 20:14:08 +0000639class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
640 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
Antoine Pitrou19690592009-06-12 20:14:08 +0000642 def test_constructor(self):
643 rawio = self.MockRawIO([b"abc"])
644 bufio = self.tp(rawio)
645 bufio.__init__(rawio)
646 bufio.__init__(rawio, buffer_size=1024)
647 bufio.__init__(rawio, buffer_size=16)
648 self.assertEquals(b"abc", bufio.read())
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
651 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
652 rawio = self.MockRawIO([b"abc"])
653 bufio.__init__(rawio)
654 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000655
Antoine Pitrou19690592009-06-12 20:14:08 +0000656 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000657 for arg in (None, 7):
658 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
659 bufio = self.tp(rawio)
660 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000661 # Invalid args
662 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000663
Antoine Pitrou19690592009-06-12 20:14:08 +0000664 def test_read1(self):
665 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
666 bufio = self.tp(rawio)
667 self.assertEquals(b"a", bufio.read(1))
668 self.assertEquals(b"b", bufio.read1(1))
669 self.assertEquals(rawio._reads, 1)
670 self.assertEquals(b"c", bufio.read1(100))
671 self.assertEquals(rawio._reads, 1)
672 self.assertEquals(b"d", bufio.read1(100))
673 self.assertEquals(rawio._reads, 2)
674 self.assertEquals(b"efg", bufio.read1(100))
675 self.assertEquals(rawio._reads, 3)
676 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000677 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000678 # Invalid args
679 self.assertRaises(ValueError, bufio.read1, -1)
680
681 def test_readinto(self):
682 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
683 bufio = self.tp(rawio)
684 b = bytearray(2)
685 self.assertEquals(bufio.readinto(b), 2)
686 self.assertEquals(b, b"ab")
687 self.assertEquals(bufio.readinto(b), 2)
688 self.assertEquals(b, b"cd")
689 self.assertEquals(bufio.readinto(b), 2)
690 self.assertEquals(b, b"ef")
691 self.assertEquals(bufio.readinto(b), 1)
692 self.assertEquals(b, b"gf")
693 self.assertEquals(bufio.readinto(b), 0)
694 self.assertEquals(b, b"gf")
695
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000696 def test_readlines(self):
697 def bufio():
698 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
699 return self.tp(rawio)
700 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
701 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
702 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
703
Antoine Pitrou19690592009-06-12 20:14:08 +0000704 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000705 data = b"abcdefghi"
706 dlen = len(data)
707
708 tests = [
709 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
710 [ 100, [ 3, 3, 3], [ dlen ] ],
711 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
712 ]
713
714 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 rawio = self.MockFileIO(data)
716 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000717 pos = 0
718 for nbytes in buf_read_sizes:
719 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
720 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000721 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000722 self.assertEquals(rawio.read_history, raw_read_sizes)
723
Antoine Pitrou19690592009-06-12 20:14:08 +0000724 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000725 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000726 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
727 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728
729 self.assertEquals(b"abcd", bufio.read(6))
730 self.assertEquals(b"e", bufio.read(1))
731 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000732 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000733 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000734 self.assertEquals(b"", bufio.read())
735
Antoine Pitrou19690592009-06-12 20:14:08 +0000736 def test_read_past_eof(self):
737 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
738 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000739
740 self.assertEquals(b"abcdefg", bufio.read(9000))
741
Antoine Pitrou19690592009-06-12 20:14:08 +0000742 def test_read_all(self):
743 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
744 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000745
746 self.assertEquals(b"abcdefg", bufio.read())
747
Antoine Pitrou19690592009-06-12 20:14:08 +0000748 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000749 try:
750 # Write out many bytes with exactly the same number of 0's,
751 # 1's... 255's. This will help us check that concurrent reading
752 # doesn't duplicate or forget contents.
753 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000754 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000755 random.shuffle(l)
756 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000757 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000758 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000759 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000760 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000761 errors = []
762 results = []
763 def f():
764 try:
765 # Intra-buffer read then buffer-flushing read
766 for n in cycle([1, 19]):
767 s = bufio.read(n)
768 if not s:
769 break
770 # list.append() is atomic
771 results.append(s)
772 except Exception as e:
773 errors.append(e)
774 raise
775 threads = [threading.Thread(target=f) for x in range(20)]
776 for t in threads:
777 t.start()
778 time.sleep(0.02) # yield
779 for t in threads:
780 t.join()
781 self.assertFalse(errors,
782 "the following exceptions were caught: %r" % errors)
783 s = b''.join(results)
784 for i in range(256):
785 c = bytes(bytearray([i]))
786 self.assertEqual(s.count(c), N)
787 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000788 support.unlink(support.TESTFN)
789
790 def test_misbehaved_io(self):
791 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
792 bufio = self.tp(rawio)
793 self.assertRaises(IOError, bufio.seek, 0)
794 self.assertRaises(IOError, bufio.tell)
795
796class CBufferedReaderTest(BufferedReaderTest):
797 tp = io.BufferedReader
798
799 def test_constructor(self):
800 BufferedReaderTest.test_constructor(self)
801 # The allocation can succeed on 32-bit builds, e.g. with more
802 # than 2GB RAM and a 64-bit kernel.
803 if sys.maxsize > 0x7FFFFFFF:
804 rawio = self.MockRawIO()
805 bufio = self.tp(rawio)
806 self.assertRaises((OverflowError, MemoryError, ValueError),
807 bufio.__init__, rawio, sys.maxsize)
808
809 def test_initialization(self):
810 rawio = self.MockRawIO([b"abc"])
811 bufio = self.tp(rawio)
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
813 self.assertRaises(ValueError, bufio.read)
814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
815 self.assertRaises(ValueError, bufio.read)
816 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
817 self.assertRaises(ValueError, bufio.read)
818
819 def test_misbehaved_io_read(self):
820 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
821 bufio = self.tp(rawio)
822 # _pyio.BufferedReader seems to implement reading different, so that
823 # checking this is not so easy.
824 self.assertRaises(IOError, bufio.read, 10)
825
826 def test_garbage_collection(self):
827 # C BufferedReader objects are collected.
828 # The Python version has __del__, so it ends into gc.garbage instead
829 rawio = self.FileIO(support.TESTFN, "w+b")
830 f = self.tp(rawio)
831 f.f = f
832 wr = weakref.ref(f)
833 del f
834 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000835 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000836
837class PyBufferedReaderTest(BufferedReaderTest):
838 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000839
840
Antoine Pitrou19690592009-06-12 20:14:08 +0000841class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
842 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000843
Antoine Pitrou19690592009-06-12 20:14:08 +0000844 def test_constructor(self):
845 rawio = self.MockRawIO()
846 bufio = self.tp(rawio)
847 bufio.__init__(rawio)
848 bufio.__init__(rawio, buffer_size=1024)
849 bufio.__init__(rawio, buffer_size=16)
850 self.assertEquals(3, bufio.write(b"abc"))
851 bufio.flush()
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
855 bufio.__init__(rawio)
856 self.assertEquals(3, bufio.write(b"ghi"))
857 bufio.flush()
858 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000859
Antoine Pitrou19690592009-06-12 20:14:08 +0000860 def test_detach_flush(self):
861 raw = self.MockRawIO()
862 buf = self.tp(raw)
863 buf.write(b"howdy!")
864 self.assertFalse(raw._write_stack)
865 buf.detach()
866 self.assertEqual(raw._write_stack, [b"howdy!"])
867
868 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000869 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000870 writer = self.MockRawIO()
871 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000872 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000873 self.assertFalse(writer._write_stack)
874
Antoine Pitrou19690592009-06-12 20:14:08 +0000875 def test_write_overflow(self):
876 writer = self.MockRawIO()
877 bufio = self.tp(writer, 8)
878 contents = b"abcdefghijklmnop"
879 for n in range(0, len(contents), 3):
880 bufio.write(contents[n:n+3])
881 flushed = b"".join(writer._write_stack)
882 # At least (total - 8) bytes were implicitly flushed, perhaps more
883 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000884 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000885
Antoine Pitrou19690592009-06-12 20:14:08 +0000886 def check_writes(self, intermediate_func):
887 # Lots of writes, test the flushed output is as expected.
888 contents = bytes(range(256)) * 1000
889 n = 0
890 writer = self.MockRawIO()
891 bufio = self.tp(writer, 13)
892 # Generator of write sizes: repeat each N 15 times then proceed to N+1
893 def gen_sizes():
894 for size in count(1):
895 for i in range(15):
896 yield size
897 sizes = gen_sizes()
898 while n < len(contents):
899 size = min(next(sizes), len(contents) - n)
900 self.assertEquals(bufio.write(contents[n:n+size]), size)
901 intermediate_func(bufio)
902 n += size
903 bufio.flush()
904 self.assertEquals(contents,
905 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000906
Antoine Pitrou19690592009-06-12 20:14:08 +0000907 def test_writes(self):
908 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000909
Antoine Pitrou19690592009-06-12 20:14:08 +0000910 def test_writes_and_flushes(self):
911 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912
Antoine Pitrou19690592009-06-12 20:14:08 +0000913 def test_writes_and_seeks(self):
914 def _seekabs(bufio):
915 pos = bufio.tell()
916 bufio.seek(pos + 1, 0)
917 bufio.seek(pos - 1, 0)
918 bufio.seek(pos, 0)
919 self.check_writes(_seekabs)
920 def _seekrel(bufio):
921 pos = bufio.seek(0, 1)
922 bufio.seek(+1, 1)
923 bufio.seek(-1, 1)
924 bufio.seek(pos, 0)
925 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000926
Antoine Pitrou19690592009-06-12 20:14:08 +0000927 def test_writes_and_truncates(self):
928 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000929
Antoine Pitrou19690592009-06-12 20:14:08 +0000930 def test_write_non_blocking(self):
931 raw = self.MockNonBlockWriterIO()
932 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000933
Antoine Pitrou19690592009-06-12 20:14:08 +0000934 self.assertEquals(bufio.write(b"abcd"), 4)
935 self.assertEquals(bufio.write(b"efghi"), 5)
936 # 1 byte will be written, the rest will be buffered
937 raw.block_on(b"k")
938 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000939
Antoine Pitrou19690592009-06-12 20:14:08 +0000940 # 8 bytes will be written, 8 will be buffered and the rest will be lost
941 raw.block_on(b"0")
942 try:
943 bufio.write(b"opqrwxyz0123456789")
944 except self.BlockingIOError as e:
945 written = e.characters_written
946 else:
947 self.fail("BlockingIOError should have been raised")
948 self.assertEquals(written, 16)
949 self.assertEquals(raw.pop_written(),
950 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000951
Antoine Pitrou19690592009-06-12 20:14:08 +0000952 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
953 s = raw.pop_written()
954 # Previously buffered bytes were flushed
955 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000956
Antoine Pitrou19690592009-06-12 20:14:08 +0000957 def test_write_and_rewind(self):
958 raw = io.BytesIO()
959 bufio = self.tp(raw, 4)
960 self.assertEqual(bufio.write(b"abcdef"), 6)
961 self.assertEqual(bufio.tell(), 6)
962 bufio.seek(0, 0)
963 self.assertEqual(bufio.write(b"XY"), 2)
964 bufio.seek(6, 0)
965 self.assertEqual(raw.getvalue(), b"XYcdef")
966 self.assertEqual(bufio.write(b"123456"), 6)
967 bufio.flush()
968 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000969
Antoine Pitrou19690592009-06-12 20:14:08 +0000970 def test_flush(self):
971 writer = self.MockRawIO()
972 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000973 bufio.write(b"abc")
974 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000975 self.assertEquals(b"abc", writer._write_stack[0])
976
Antoine Pitrou19690592009-06-12 20:14:08 +0000977 def test_destructor(self):
978 writer = self.MockRawIO()
979 bufio = self.tp(writer, 8)
980 bufio.write(b"abc")
981 del bufio
982 support.gc_collect()
983 self.assertEquals(b"abc", writer._write_stack[0])
984
985 def test_truncate(self):
986 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000987 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000988 bufio = self.tp(raw, 8)
989 bufio.write(b"abcdef")
990 self.assertEqual(bufio.truncate(3), 3)
991 self.assertEqual(bufio.tell(), 3)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000992 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000993 self.assertEqual(f.read(), b"abc")
994
995 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000996 try:
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 # Write out many bytes from many threads and test they were
998 # all flushed.
999 N = 1000
1000 contents = bytes(range(256)) * N
1001 sizes = cycle([1, 19])
1002 n = 0
1003 queue = deque()
1004 while n < len(contents):
1005 size = next(sizes)
1006 queue.append(contents[n:n+size])
1007 n += size
1008 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001009 # We use a real file object because it allows us to
1010 # exercise situations where the GIL is released before
1011 # writing the buffer to the raw streams. This is in addition
1012 # to concurrency issues due to switching threads in the middle
1013 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001014 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001015 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001016 errors = []
1017 def f():
1018 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001019 while True:
1020 try:
1021 s = queue.popleft()
1022 except IndexError:
1023 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001024 bufio.write(s)
1025 except Exception as e:
1026 errors.append(e)
1027 raise
1028 threads = [threading.Thread(target=f) for x in range(20)]
1029 for t in threads:
1030 t.start()
1031 time.sleep(0.02) # yield
1032 for t in threads:
1033 t.join()
1034 self.assertFalse(errors,
1035 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001036 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001037 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001038 s = f.read()
1039 for i in range(256):
1040 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001041 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001042 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001043
Antoine Pitrou19690592009-06-12 20:14:08 +00001044 def test_misbehaved_io(self):
1045 rawio = self.MisbehavedRawIO()
1046 bufio = self.tp(rawio, 5)
1047 self.assertRaises(IOError, bufio.seek, 0)
1048 self.assertRaises(IOError, bufio.tell)
1049 self.assertRaises(IOError, bufio.write, b"abcdef")
1050
1051 def test_max_buffer_size_deprecation(self):
1052 with support.check_warnings() as w:
1053 warnings.simplefilter("always", DeprecationWarning)
1054 self.tp(self.MockRawIO(), 8, 12)
1055 self.assertEqual(len(w.warnings), 1)
1056 warning = w.warnings[0]
1057 self.assertTrue(warning.category is DeprecationWarning)
1058 self.assertEqual(str(warning.message),
1059 "max_buffer_size is deprecated")
1060
1061
1062class CBufferedWriterTest(BufferedWriterTest):
1063 tp = io.BufferedWriter
1064
1065 def test_constructor(self):
1066 BufferedWriterTest.test_constructor(self)
1067 # The allocation can succeed on 32-bit builds, e.g. with more
1068 # than 2GB RAM and a 64-bit kernel.
1069 if sys.maxsize > 0x7FFFFFFF:
1070 rawio = self.MockRawIO()
1071 bufio = self.tp(rawio)
1072 self.assertRaises((OverflowError, MemoryError, ValueError),
1073 bufio.__init__, rawio, sys.maxsize)
1074
1075 def test_initialization(self):
1076 rawio = self.MockRawIO()
1077 bufio = self.tp(rawio)
1078 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1079 self.assertRaises(ValueError, bufio.write, b"def")
1080 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1081 self.assertRaises(ValueError, bufio.write, b"def")
1082 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1083 self.assertRaises(ValueError, bufio.write, b"def")
1084
1085 def test_garbage_collection(self):
1086 # C BufferedWriter objects are collected, and collecting them flushes
1087 # all data to disk.
1088 # The Python version has __del__, so it ends into gc.garbage instead
1089 rawio = self.FileIO(support.TESTFN, "w+b")
1090 f = self.tp(rawio)
1091 f.write(b"123xxx")
1092 f.x = f
1093 wr = weakref.ref(f)
1094 del f
1095 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001096 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001097 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001098 self.assertEqual(f.read(), b"123xxx")
1099
1100
1101class PyBufferedWriterTest(BufferedWriterTest):
1102 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001103
1104class BufferedRWPairTest(unittest.TestCase):
1105
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 def test_constructor(self):
1107 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001108 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 def test_detach(self):
1111 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1112 self.assertRaises(self.UnsupportedOperation, pair.detach)
1113
1114 def test_constructor_max_buffer_size_deprecation(self):
1115 with support.check_warnings() as w:
1116 warnings.simplefilter("always", DeprecationWarning)
1117 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1118 self.assertEqual(len(w.warnings), 1)
1119 warning = w.warnings[0]
1120 self.assertTrue(warning.category is DeprecationWarning)
1121 self.assertEqual(str(warning.message),
1122 "max_buffer_size is deprecated")
1123
1124 def test_constructor_with_not_readable(self):
1125 class NotReadable(MockRawIO):
1126 def readable(self):
1127 return False
1128
1129 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1130
1131 def test_constructor_with_not_writeable(self):
1132 class NotWriteable(MockRawIO):
1133 def writable(self):
1134 return False
1135
1136 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1137
1138 def test_read(self):
1139 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1140
1141 self.assertEqual(pair.read(3), b"abc")
1142 self.assertEqual(pair.read(1), b"d")
1143 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001144 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1145 self.assertEqual(pair.read(None), b"abc")
1146
1147 def test_readlines(self):
1148 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1149 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1150 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1151 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001152
1153 def test_read1(self):
1154 # .read1() is delegated to the underlying reader object, so this test
1155 # can be shallow.
1156 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1157
1158 self.assertEqual(pair.read1(3), b"abc")
1159
1160 def test_readinto(self):
1161 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1162
1163 data = bytearray(5)
1164 self.assertEqual(pair.readinto(data), 5)
1165 self.assertEqual(data, b"abcde")
1166
1167 def test_write(self):
1168 w = self.MockRawIO()
1169 pair = self.tp(self.MockRawIO(), w)
1170
1171 pair.write(b"abc")
1172 pair.flush()
1173 pair.write(b"def")
1174 pair.flush()
1175 self.assertEqual(w._write_stack, [b"abc", b"def"])
1176
1177 def test_peek(self):
1178 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1179
1180 self.assertTrue(pair.peek(3).startswith(b"abc"))
1181 self.assertEqual(pair.read(3), b"abc")
1182
1183 def test_readable(self):
1184 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1185 self.assertTrue(pair.readable())
1186
1187 def test_writeable(self):
1188 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1189 self.assertTrue(pair.writable())
1190
1191 def test_seekable(self):
1192 # BufferedRWPairs are never seekable, even if their readers and writers
1193 # are.
1194 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1195 self.assertFalse(pair.seekable())
1196
1197 # .flush() is delegated to the underlying writer object and has been
1198 # tested in the test_write method.
1199
1200 def test_close_and_closed(self):
1201 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1202 self.assertFalse(pair.closed)
1203 pair.close()
1204 self.assertTrue(pair.closed)
1205
1206 def test_isatty(self):
1207 class SelectableIsAtty(MockRawIO):
1208 def __init__(self, isatty):
1209 MockRawIO.__init__(self)
1210 self._isatty = isatty
1211
1212 def isatty(self):
1213 return self._isatty
1214
1215 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1216 self.assertFalse(pair.isatty())
1217
1218 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1219 self.assertTrue(pair.isatty())
1220
1221 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1222 self.assertTrue(pair.isatty())
1223
1224 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1225 self.assertTrue(pair.isatty())
1226
1227class CBufferedRWPairTest(BufferedRWPairTest):
1228 tp = io.BufferedRWPair
1229
1230class PyBufferedRWPairTest(BufferedRWPairTest):
1231 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001232
1233
Antoine Pitrou19690592009-06-12 20:14:08 +00001234class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1235 read_mode = "rb+"
1236 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001237
Antoine Pitrou19690592009-06-12 20:14:08 +00001238 def test_constructor(self):
1239 BufferedReaderTest.test_constructor(self)
1240 BufferedWriterTest.test_constructor(self)
1241
1242 def test_read_and_write(self):
1243 raw = self.MockRawIO((b"asdf", b"ghjk"))
1244 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001245
1246 self.assertEqual(b"as", rw.read(2))
1247 rw.write(b"ddd")
1248 rw.write(b"eee")
1249 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001250 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001251 self.assertEquals(b"dddeee", raw._write_stack[0])
1252
Antoine Pitrou19690592009-06-12 20:14:08 +00001253 def test_seek_and_tell(self):
1254 raw = self.BytesIO(b"asdfghjkl")
1255 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001256
1257 self.assertEquals(b"as", rw.read(2))
1258 self.assertEquals(2, rw.tell())
1259 rw.seek(0, 0)
1260 self.assertEquals(b"asdf", rw.read(4))
1261
1262 rw.write(b"asdf")
1263 rw.seek(0, 0)
1264 self.assertEquals(b"asdfasdfl", rw.read())
1265 self.assertEquals(9, rw.tell())
1266 rw.seek(-4, 2)
1267 self.assertEquals(5, rw.tell())
1268 rw.seek(2, 1)
1269 self.assertEquals(7, rw.tell())
1270 self.assertEquals(b"fl", rw.read(11))
1271 self.assertRaises(TypeError, rw.seek, 0.0)
1272
Antoine Pitrou19690592009-06-12 20:14:08 +00001273 def check_flush_and_read(self, read_func):
1274 raw = self.BytesIO(b"abcdefghi")
1275 bufio = self.tp(raw)
1276
1277 self.assertEquals(b"ab", read_func(bufio, 2))
1278 bufio.write(b"12")
1279 self.assertEquals(b"ef", read_func(bufio, 2))
1280 self.assertEquals(6, bufio.tell())
1281 bufio.flush()
1282 self.assertEquals(6, bufio.tell())
1283 self.assertEquals(b"ghi", read_func(bufio))
1284 raw.seek(0, 0)
1285 raw.write(b"XYZ")
1286 # flush() resets the read buffer
1287 bufio.flush()
1288 bufio.seek(0, 0)
1289 self.assertEquals(b"XYZ", read_func(bufio, 3))
1290
1291 def test_flush_and_read(self):
1292 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1293
1294 def test_flush_and_readinto(self):
1295 def _readinto(bufio, n=-1):
1296 b = bytearray(n if n >= 0 else 9999)
1297 n = bufio.readinto(b)
1298 return bytes(b[:n])
1299 self.check_flush_and_read(_readinto)
1300
1301 def test_flush_and_peek(self):
1302 def _peek(bufio, n=-1):
1303 # This relies on the fact that the buffer can contain the whole
1304 # raw stream, otherwise peek() can return less.
1305 b = bufio.peek(n)
1306 if n != -1:
1307 b = b[:n]
1308 bufio.seek(len(b), 1)
1309 return b
1310 self.check_flush_and_read(_peek)
1311
1312 def test_flush_and_write(self):
1313 raw = self.BytesIO(b"abcdefghi")
1314 bufio = self.tp(raw)
1315
1316 bufio.write(b"123")
1317 bufio.flush()
1318 bufio.write(b"45")
1319 bufio.flush()
1320 bufio.seek(0, 0)
1321 self.assertEquals(b"12345fghi", raw.getvalue())
1322 self.assertEquals(b"12345fghi", bufio.read())
1323
1324 def test_threads(self):
1325 BufferedReaderTest.test_threads(self)
1326 BufferedWriterTest.test_threads(self)
1327
1328 def test_writes_and_peek(self):
1329 def _peek(bufio):
1330 bufio.peek(1)
1331 self.check_writes(_peek)
1332 def _peek(bufio):
1333 pos = bufio.tell()
1334 bufio.seek(-1, 1)
1335 bufio.peek(1)
1336 bufio.seek(pos, 0)
1337 self.check_writes(_peek)
1338
1339 def test_writes_and_reads(self):
1340 def _read(bufio):
1341 bufio.seek(-1, 1)
1342 bufio.read(1)
1343 self.check_writes(_read)
1344
1345 def test_writes_and_read1s(self):
1346 def _read1(bufio):
1347 bufio.seek(-1, 1)
1348 bufio.read1(1)
1349 self.check_writes(_read1)
1350
1351 def test_writes_and_readintos(self):
1352 def _read(bufio):
1353 bufio.seek(-1, 1)
1354 bufio.readinto(bytearray(1))
1355 self.check_writes(_read)
1356
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001357 def test_write_after_readahead(self):
1358 # Issue #6629: writing after the buffer was filled by readahead should
1359 # first rewind the raw stream.
1360 for overwrite_size in [1, 5]:
1361 raw = self.BytesIO(b"A" * 10)
1362 bufio = self.tp(raw, 4)
1363 # Trigger readahead
1364 self.assertEqual(bufio.read(1), b"A")
1365 self.assertEqual(bufio.tell(), 1)
1366 # Overwriting should rewind the raw stream if it needs so
1367 bufio.write(b"B" * overwrite_size)
1368 self.assertEqual(bufio.tell(), overwrite_size + 1)
1369 # If the write size was smaller than the buffer size, flush() and
1370 # check that rewind happens.
1371 bufio.flush()
1372 self.assertEqual(bufio.tell(), overwrite_size + 1)
1373 s = raw.getvalue()
1374 self.assertEqual(s,
1375 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1376
Antoine Pitrou19690592009-06-12 20:14:08 +00001377 def test_misbehaved_io(self):
1378 BufferedReaderTest.test_misbehaved_io(self)
1379 BufferedWriterTest.test_misbehaved_io(self)
1380
1381class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1382 tp = io.BufferedRandom
1383
1384 def test_constructor(self):
1385 BufferedRandomTest.test_constructor(self)
1386 # The allocation can succeed on 32-bit builds, e.g. with more
1387 # than 2GB RAM and a 64-bit kernel.
1388 if sys.maxsize > 0x7FFFFFFF:
1389 rawio = self.MockRawIO()
1390 bufio = self.tp(rawio)
1391 self.assertRaises((OverflowError, MemoryError, ValueError),
1392 bufio.__init__, rawio, sys.maxsize)
1393
1394 def test_garbage_collection(self):
1395 CBufferedReaderTest.test_garbage_collection(self)
1396 CBufferedWriterTest.test_garbage_collection(self)
1397
1398class PyBufferedRandomTest(BufferedRandomTest):
1399 tp = pyio.BufferedRandom
1400
1401
Christian Heimes1a6387e2008-03-26 12:49:49 +00001402# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1403# properties:
1404# - A single output character can correspond to many bytes of input.
1405# - The number of input bytes to complete the character can be
1406# undetermined until the last input byte is received.
1407# - The number of input bytes can vary depending on previous input.
1408# - A single input byte can correspond to many characters of output.
1409# - The number of output characters can be undetermined until the
1410# last input byte is received.
1411# - The number of output characters can vary depending on previous input.
1412
1413class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1414 """
1415 For testing seek/tell behavior with a stateful, buffering decoder.
1416
1417 Input is a sequence of words. Words may be fixed-length (length set
1418 by input) or variable-length (period-terminated). In variable-length
1419 mode, extra periods are ignored. Possible words are:
1420 - 'i' followed by a number sets the input length, I (maximum 99).
1421 When I is set to 0, words are space-terminated.
1422 - 'o' followed by a number sets the output length, O (maximum 99).
1423 - Any other word is converted into a word followed by a period on
1424 the output. The output word consists of the input word truncated
1425 or padded out with hyphens to make its length equal to O. If O
1426 is 0, the word is output verbatim without truncating or padding.
1427 I and O are initially set to 1. When I changes, any buffered input is
1428 re-scanned according to the new I. EOF also terminates the last word.
1429 """
1430
1431 def __init__(self, errors='strict'):
1432 codecs.IncrementalDecoder.__init__(self, errors)
1433 self.reset()
1434
1435 def __repr__(self):
1436 return '<SID %x>' % id(self)
1437
1438 def reset(self):
1439 self.i = 1
1440 self.o = 1
1441 self.buffer = bytearray()
1442
1443 def getstate(self):
1444 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1445 return bytes(self.buffer), i*100 + o
1446
1447 def setstate(self, state):
1448 buffer, io = state
1449 self.buffer = bytearray(buffer)
1450 i, o = divmod(io, 100)
1451 self.i, self.o = i ^ 1, o ^ 1
1452
1453 def decode(self, input, final=False):
1454 output = ''
1455 for b in input:
1456 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001457 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001458 if self.buffer:
1459 output += self.process_word()
1460 else:
1461 self.buffer.append(b)
1462 else: # fixed-length, terminate after self.i bytes
1463 self.buffer.append(b)
1464 if len(self.buffer) == self.i:
1465 output += self.process_word()
1466 if final and self.buffer: # EOF terminates the last word
1467 output += self.process_word()
1468 return output
1469
1470 def process_word(self):
1471 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001472 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001473 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001474 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001475 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1476 else:
1477 output = self.buffer.decode('ascii')
1478 if len(output) < self.o:
1479 output += '-'*self.o # pad out with hyphens
1480 if self.o:
1481 output = output[:self.o] # truncate to output length
1482 output += '.'
1483 self.buffer = bytearray()
1484 return output
1485
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001486 codecEnabled = False
1487
1488 @classmethod
1489 def lookupTestDecoder(cls, name):
1490 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001491 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001492 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001493 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001494 incrementalencoder=None,
1495 streamreader=None, streamwriter=None,
1496 incrementaldecoder=cls)
1497
1498# Register the previous decoder for testing.
1499# Disabled by default, tests will enable it.
1500codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1501
1502
Christian Heimes1a6387e2008-03-26 12:49:49 +00001503class StatefulIncrementalDecoderTest(unittest.TestCase):
1504 """
1505 Make sure the StatefulIncrementalDecoder actually works.
1506 """
1507
1508 test_cases = [
1509 # I=1, O=1 (fixed-length input == fixed-length output)
1510 (b'abcd', False, 'a.b.c.d.'),
1511 # I=0, O=0 (variable-length input, variable-length output)
1512 (b'oiabcd', True, 'abcd.'),
1513 # I=0, O=0 (should ignore extra periods)
1514 (b'oi...abcd...', True, 'abcd.'),
1515 # I=0, O=6 (variable-length input, fixed-length output)
1516 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1517 # I=2, O=6 (fixed-length input < fixed-length output)
1518 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1519 # I=6, O=3 (fixed-length input > fixed-length output)
1520 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1521 # I=0, then 3; O=29, then 15 (with longer output)
1522 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1523 'a----------------------------.' +
1524 'b----------------------------.' +
1525 'cde--------------------------.' +
1526 'abcdefghijabcde.' +
1527 'a.b------------.' +
1528 '.c.------------.' +
1529 'd.e------------.' +
1530 'k--------------.' +
1531 'l--------------.' +
1532 'm--------------.')
1533 ]
1534
Antoine Pitrou19690592009-06-12 20:14:08 +00001535 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001536 # Try a few one-shot test cases.
1537 for input, eof, output in self.test_cases:
1538 d = StatefulIncrementalDecoder()
1539 self.assertEquals(d.decode(input, eof), output)
1540
1541 # Also test an unfinished decode, followed by forcing EOF.
1542 d = StatefulIncrementalDecoder()
1543 self.assertEquals(d.decode(b'oiabcd'), '')
1544 self.assertEquals(d.decode(b'', 1), 'abcd.')
1545
1546class TextIOWrapperTest(unittest.TestCase):
1547
1548 def setUp(self):
1549 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1550 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001551 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001552
1553 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001554 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001555
Antoine Pitrou19690592009-06-12 20:14:08 +00001556 def test_constructor(self):
1557 r = self.BytesIO(b"\xc3\xa9\n\n")
1558 b = self.BufferedReader(r, 1000)
1559 t = self.TextIOWrapper(b)
1560 t.__init__(b, encoding="latin1", newline="\r\n")
1561 self.assertEquals(t.encoding, "latin1")
1562 self.assertEquals(t.line_buffering, False)
1563 t.__init__(b, encoding="utf8", line_buffering=True)
1564 self.assertEquals(t.encoding, "utf8")
1565 self.assertEquals(t.line_buffering, True)
1566 self.assertEquals("\xe9\n", t.readline())
1567 self.assertRaises(TypeError, t.__init__, b, newline=42)
1568 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1569
1570 def test_detach(self):
1571 r = self.BytesIO()
1572 b = self.BufferedWriter(r)
1573 t = self.TextIOWrapper(b)
1574 self.assertIs(t.detach(), b)
1575
1576 t = self.TextIOWrapper(b, encoding="ascii")
1577 t.write("howdy")
1578 self.assertFalse(r.getvalue())
1579 t.detach()
1580 self.assertEqual(r.getvalue(), b"howdy")
1581 self.assertRaises(ValueError, t.detach)
1582
1583 def test_repr(self):
1584 raw = self.BytesIO("hello".encode("utf-8"))
1585 b = self.BufferedReader(raw)
1586 t = self.TextIOWrapper(b, encoding="utf-8")
1587 modname = self.TextIOWrapper.__module__
1588 self.assertEqual(repr(t),
1589 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1590 raw.name = "dummy"
1591 self.assertEqual(repr(t),
1592 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1593 raw.name = b"dummy"
1594 self.assertEqual(repr(t),
1595 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1596
1597 def test_line_buffering(self):
1598 r = self.BytesIO()
1599 b = self.BufferedWriter(r, 1000)
1600 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1601 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001602 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001603 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001604 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001605 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001606 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1607
Antoine Pitrou19690592009-06-12 20:14:08 +00001608 def test_encoding(self):
1609 # Check the encoding attribute is always set, and valid
1610 b = self.BytesIO()
1611 t = self.TextIOWrapper(b, encoding="utf8")
1612 self.assertEqual(t.encoding, "utf8")
1613 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001614 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001615 codecs.lookup(t.encoding)
1616
1617 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001618 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001619 b = self.BytesIO(b"abc\n\xff\n")
1620 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001621 self.assertRaises(UnicodeError, t.read)
1622 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001623 b = self.BytesIO(b"abc\n\xff\n")
1624 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001625 self.assertRaises(UnicodeError, t.read)
1626 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001627 b = self.BytesIO(b"abc\n\xff\n")
1628 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001629 self.assertEquals(t.read(), "abc\n\n")
1630 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001631 b = self.BytesIO(b"abc\n\xff\n")
1632 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1633 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001634
Antoine Pitrou19690592009-06-12 20:14:08 +00001635 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001636 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001637 b = self.BytesIO()
1638 t = self.TextIOWrapper(b, encoding="ascii")
1639 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001640 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001641 b = self.BytesIO()
1642 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1643 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001644 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001645 b = self.BytesIO()
1646 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001647 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001648 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001649 t.flush()
1650 self.assertEquals(b.getvalue(), b"abcdef\n")
1651 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001652 b = self.BytesIO()
1653 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001654 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001655 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001656 t.flush()
1657 self.assertEquals(b.getvalue(), b"abc?def\n")
1658
Antoine Pitrou19690592009-06-12 20:14:08 +00001659 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001660 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1661
1662 tests = [
1663 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1664 [ '', input_lines ],
1665 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1666 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1667 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1668 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001669 encodings = (
1670 'utf-8', 'latin-1',
1671 'utf-16', 'utf-16-le', 'utf-16-be',
1672 'utf-32', 'utf-32-le', 'utf-32-be',
1673 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001674
1675 # Try a range of buffer sizes to test the case where \r is the last
1676 # character in TextIOWrapper._pending_line.
1677 for encoding in encodings:
1678 # XXX: str.encode() should return bytes
1679 data = bytes(''.join(input_lines).encode(encoding))
1680 for do_reads in (False, True):
1681 for bufsize in range(1, 10):
1682 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001683 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1684 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001685 encoding=encoding)
1686 if do_reads:
1687 got_lines = []
1688 while True:
1689 c2 = textio.read(2)
1690 if c2 == '':
1691 break
1692 self.assertEquals(len(c2), 2)
1693 got_lines.append(c2 + textio.readline())
1694 else:
1695 got_lines = list(textio)
1696
1697 for got_line, exp_line in zip(got_lines, exp_lines):
1698 self.assertEquals(got_line, exp_line)
1699 self.assertEquals(len(got_lines), len(exp_lines))
1700
Antoine Pitrou19690592009-06-12 20:14:08 +00001701 def test_newlines_input(self):
1702 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001703 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1704 for newline, expected in [
1705 (None, normalized.decode("ascii").splitlines(True)),
1706 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001707 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1708 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1709 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001710 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001711 buf = self.BytesIO(testdata)
1712 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001713 self.assertEquals(txt.readlines(), expected)
1714 txt.seek(0)
1715 self.assertEquals(txt.read(), "".join(expected))
1716
Antoine Pitrou19690592009-06-12 20:14:08 +00001717 def test_newlines_output(self):
1718 testdict = {
1719 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1720 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1721 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1722 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1723 }
1724 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1725 for newline, expected in tests:
1726 buf = self.BytesIO()
1727 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1728 txt.write("AAA\nB")
1729 txt.write("BB\nCCC\n")
1730 txt.write("X\rY\r\nZ")
1731 txt.flush()
1732 self.assertEquals(buf.closed, False)
1733 self.assertEquals(buf.getvalue(), expected)
1734
1735 def test_destructor(self):
1736 l = []
1737 base = self.BytesIO
1738 class MyBytesIO(base):
1739 def close(self):
1740 l.append(self.getvalue())
1741 base.close(self)
1742 b = MyBytesIO()
1743 t = self.TextIOWrapper(b, encoding="ascii")
1744 t.write("abc")
1745 del t
1746 support.gc_collect()
1747 self.assertEquals([b"abc"], l)
1748
1749 def test_override_destructor(self):
1750 record = []
1751 class MyTextIO(self.TextIOWrapper):
1752 def __del__(self):
1753 record.append(1)
1754 try:
1755 f = super(MyTextIO, self).__del__
1756 except AttributeError:
1757 pass
1758 else:
1759 f()
1760 def close(self):
1761 record.append(2)
1762 super(MyTextIO, self).close()
1763 def flush(self):
1764 record.append(3)
1765 super(MyTextIO, self).flush()
1766 b = self.BytesIO()
1767 t = MyTextIO(b, encoding="ascii")
1768 del t
1769 support.gc_collect()
1770 self.assertEqual(record, [1, 2, 3])
1771
1772 def test_error_through_destructor(self):
1773 # Test that the exception state is not modified by a destructor,
1774 # even if close() fails.
1775 rawio = self.CloseFailureIO()
1776 def f():
1777 self.TextIOWrapper(rawio).xyzzy
1778 with support.captured_output("stderr") as s:
1779 self.assertRaises(AttributeError, f)
1780 s = s.getvalue().strip()
1781 if s:
1782 # The destructor *may* have printed an unraisable error, check it
1783 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001784 self.assertTrue(s.startswith("Exception IOError: "), s)
1785 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001786
1787 # Systematic tests of the text I/O API
1788
Antoine Pitrou19690592009-06-12 20:14:08 +00001789 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001790 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1791 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001792 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001793 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001794 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001795 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001796 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001797 f._CHUNK_SIZE = chunksize
1798 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001799 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800 cookie = f.tell()
1801 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001802 self.assertEquals(f.read(None), "abc")
1803 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001804 self.assertEquals(f.read(2), "ab")
1805 self.assertEquals(f.read(1), "c")
1806 self.assertEquals(f.read(1), "")
1807 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001808 self.assertEquals(f.tell(), cookie)
1809 self.assertEquals(f.seek(0), 0)
1810 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001811 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001812 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001813 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001814 if enc.startswith("utf"):
1815 self.multi_line_test(f, enc)
1816 f.close()
1817
1818 def multi_line_test(self, f, enc):
1819 f.seek(0)
1820 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001821 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001822 wlines = []
1823 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1824 chars = []
1825 for i in range(size):
1826 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001827 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001828 wlines.append((f.tell(), line))
1829 f.write(line)
1830 f.seek(0)
1831 rlines = []
1832 while True:
1833 pos = f.tell()
1834 line = f.readline()
1835 if not line:
1836 break
1837 rlines.append((pos, line))
1838 self.assertEquals(rlines, wlines)
1839
Antoine Pitrou19690592009-06-12 20:14:08 +00001840 def test_telling(self):
1841 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001843 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001844 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001845 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001846 p2 = f.tell()
1847 f.seek(0)
1848 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001849 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001850 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001851 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001852 self.assertEquals(f.tell(), p2)
1853 f.seek(0)
1854 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001855 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 self.assertRaises(IOError, f.tell)
1857 self.assertEquals(f.tell(), p2)
1858 f.close()
1859
Antoine Pitrou19690592009-06-12 20:14:08 +00001860 def test_seeking(self):
1861 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001862 prefix_size = chunk_size - 2
1863 u_prefix = "a" * prefix_size
1864 prefix = bytes(u_prefix.encode("utf-8"))
1865 self.assertEquals(len(u_prefix), len(prefix))
1866 u_suffix = "\u8888\n"
1867 suffix = bytes(u_suffix.encode("utf-8"))
1868 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001869 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001870 f.write(line*2)
1871 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001872 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001873 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 self.assertEquals(f.tell(), prefix_size)
1876 self.assertEquals(f.readline(), u_suffix)
1877
Antoine Pitrou19690592009-06-12 20:14:08 +00001878 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001879 # Regression test for a specific bug
1880 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001881 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882 f.write(data)
1883 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001884 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001885 f._CHUNK_SIZE # Just test that it exists
1886 f._CHUNK_SIZE = 2
1887 f.readline()
1888 f.tell()
1889
Antoine Pitrou19690592009-06-12 20:14:08 +00001890 def test_seek_and_tell(self):
1891 #Test seek/tell using the StatefulIncrementalDecoder.
1892 # Make test faster by doing smaller seeks
1893 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001894
Antoine Pitrou19690592009-06-12 20:14:08 +00001895 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 """Tell/seek to various points within a data stream and ensure
1897 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 f.write(data)
1900 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001901 f = self.open(support.TESTFN, encoding='test_decoder')
1902 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001903 decoded = f.read()
1904 f.close()
1905
1906 for i in range(min_pos, len(decoded) + 1): # seek positions
1907 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001908 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001909 self.assertEquals(f.read(i), decoded[:i])
1910 cookie = f.tell()
1911 self.assertEquals(f.read(j), decoded[i:i + j])
1912 f.seek(cookie)
1913 self.assertEquals(f.read(), decoded[i:])
1914 f.close()
1915
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001916 # Enable the test decoder.
1917 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001918
1919 # Run the tests.
1920 try:
1921 # Try each test case.
1922 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001923 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001924
1925 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1927 offset = CHUNK_SIZE - len(input)//2
1928 prefix = b'.'*offset
1929 # Don't bother seeking into the prefix (takes too long).
1930 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001931 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001932
1933 # Ensure our test decoder won't interfere with subsequent tests.
1934 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001935 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936
Antoine Pitrou19690592009-06-12 20:14:08 +00001937 def test_encoded_writes(self):
1938 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 tests = ("utf-16",
1940 "utf-16-le",
1941 "utf-16-be",
1942 "utf-32",
1943 "utf-32-le",
1944 "utf-32-be")
1945 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001946 buf = self.BytesIO()
1947 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001948 # Check if the BOM is written only once (see issue1753).
1949 f.write(data)
1950 f.write(data)
1951 f.seek(0)
1952 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001953 f.seek(0)
1954 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001955 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1956
Antoine Pitrou19690592009-06-12 20:14:08 +00001957 def test_unreadable(self):
1958 class UnReadable(self.BytesIO):
1959 def readable(self):
1960 return False
1961 txt = self.TextIOWrapper(UnReadable())
1962 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001963
Antoine Pitrou19690592009-06-12 20:14:08 +00001964 def test_read_one_by_one(self):
1965 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966 reads = ""
1967 while True:
1968 c = txt.read(1)
1969 if not c:
1970 break
1971 reads += c
1972 self.assertEquals(reads, "AA\nBB")
1973
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001974 def test_readlines(self):
1975 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1976 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1977 txt.seek(0)
1978 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1979 txt.seek(0)
1980 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1981
Christian Heimes1a6387e2008-03-26 12:49:49 +00001982 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001983 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001984 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001986 reads = ""
1987 while True:
1988 c = txt.read(128)
1989 if not c:
1990 break
1991 reads += c
1992 self.assertEquals(reads, "A"*127+"\nB")
1993
1994 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001995 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001996
1997 # read one char at a time
1998 reads = ""
1999 while True:
2000 c = txt.read(1)
2001 if not c:
2002 break
2003 reads += c
2004 self.assertEquals(reads, self.normalized)
2005
2006 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002007 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002008 txt._CHUNK_SIZE = 4
2009
2010 reads = ""
2011 while True:
2012 c = txt.read(4)
2013 if not c:
2014 break
2015 reads += c
2016 self.assertEquals(reads, self.normalized)
2017
2018 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002019 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002020 txt._CHUNK_SIZE = 4
2021
2022 reads = txt.read(4)
2023 reads += txt.read(4)
2024 reads += txt.readline()
2025 reads += txt.readline()
2026 reads += txt.readline()
2027 self.assertEquals(reads, self.normalized)
2028
2029 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002030 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002031 txt._CHUNK_SIZE = 4
2032
2033 reads = txt.read(4)
2034 reads += txt.read()
2035 self.assertEquals(reads, self.normalized)
2036
2037 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002038 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002039 txt._CHUNK_SIZE = 4
2040
2041 reads = txt.read(4)
2042 pos = txt.tell()
2043 txt.seek(0)
2044 txt.seek(pos)
2045 self.assertEquals(txt.read(4), "BBB\n")
2046
2047 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002048 buffer = self.BytesIO(self.testdata)
2049 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002050
2051 self.assertEqual(buffer.seekable(), txt.seekable())
2052
Antoine Pitrou19690592009-06-12 20:14:08 +00002053 @unittest.skip("Issue #6213 with incremental encoders")
2054 def test_append_bom(self):
2055 # The BOM is not written again when appending to a non-empty file
2056 filename = support.TESTFN
2057 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2058 with self.open(filename, 'w', encoding=charset) as f:
2059 f.write('aaa')
2060 pos = f.tell()
2061 with self.open(filename, 'rb') as f:
2062 self.assertEquals(f.read(), 'aaa'.encode(charset))
2063
2064 with self.open(filename, 'a', encoding=charset) as f:
2065 f.write('xxx')
2066 with self.open(filename, 'rb') as f:
2067 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2068
2069 @unittest.skip("Issue #6213 with incremental encoders")
2070 def test_seek_bom(self):
2071 # Same test, but when seeking manually
2072 filename = support.TESTFN
2073 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2074 with self.open(filename, 'w', encoding=charset) as f:
2075 f.write('aaa')
2076 pos = f.tell()
2077 with self.open(filename, 'r+', encoding=charset) as f:
2078 f.seek(pos)
2079 f.write('zzz')
2080 f.seek(0)
2081 f.write('bbb')
2082 with self.open(filename, 'rb') as f:
2083 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2084
2085 def test_errors_property(self):
2086 with self.open(support.TESTFN, "w") as f:
2087 self.assertEqual(f.errors, "strict")
2088 with self.open(support.TESTFN, "w", errors="replace") as f:
2089 self.assertEqual(f.errors, "replace")
2090
2091
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002092 def test_threads_write(self):
2093 # Issue6750: concurrent writes could duplicate data
2094 event = threading.Event()
2095 with self.open(support.TESTFN, "w", buffering=1) as f:
2096 def run(n):
2097 text = "Thread%03d\n" % n
2098 event.wait()
2099 f.write(text)
2100 threads = [threading.Thread(target=lambda n=x: run(n))
2101 for x in range(20)]
2102 for t in threads:
2103 t.start()
2104 time.sleep(0.02)
2105 event.set()
2106 for t in threads:
2107 t.join()
2108 with self.open(support.TESTFN) as f:
2109 content = f.read()
2110 for n in range(20):
2111 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2112
Antoine Pitrou19690592009-06-12 20:14:08 +00002113class CTextIOWrapperTest(TextIOWrapperTest):
2114
2115 def test_initialization(self):
2116 r = self.BytesIO(b"\xc3\xa9\n\n")
2117 b = self.BufferedReader(r, 1000)
2118 t = self.TextIOWrapper(b)
2119 self.assertRaises(TypeError, t.__init__, b, newline=42)
2120 self.assertRaises(ValueError, t.read)
2121 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2122 self.assertRaises(ValueError, t.read)
2123
2124 def test_garbage_collection(self):
2125 # C TextIOWrapper objects are collected, and collecting them flushes
2126 # all data to disk.
2127 # The Python version has __del__, so it ends in gc.garbage instead.
2128 rawio = io.FileIO(support.TESTFN, "wb")
2129 b = self.BufferedWriter(rawio)
2130 t = self.TextIOWrapper(b, encoding="ascii")
2131 t.write("456def")
2132 t.x = t
2133 wr = weakref.ref(t)
2134 del t
2135 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002136 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002137 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002138 self.assertEqual(f.read(), b"456def")
2139
2140class PyTextIOWrapperTest(TextIOWrapperTest):
2141 pass
2142
2143
2144class IncrementalNewlineDecoderTest(unittest.TestCase):
2145
2146 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002147 # UTF-8 specific tests for a newline decoder
2148 def _check_decode(b, s, **kwargs):
2149 # We exercise getstate() / setstate() as well as decode()
2150 state = decoder.getstate()
2151 self.assertEquals(decoder.decode(b, **kwargs), s)
2152 decoder.setstate(state)
2153 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002154
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002155 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002157 _check_decode(b'\xe8', "")
2158 _check_decode(b'\xa2', "")
2159 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002160
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002161 _check_decode(b'\xe8', "")
2162 _check_decode(b'\xa2', "")
2163 _check_decode(b'\x88', "\u8888")
2164
2165 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002166 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2167
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002168 decoder.reset()
2169 _check_decode(b'\n', "\n")
2170 _check_decode(b'\r', "")
2171 _check_decode(b'', "\n", final=True)
2172 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002173
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002174 _check_decode(b'\r', "")
2175 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002176
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002177 _check_decode(b'\r\r\n', "\n\n")
2178 _check_decode(b'\r', "")
2179 _check_decode(b'\r', "\n")
2180 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002181
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002182 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2183 _check_decode(b'\xe8\xa2\x88', "\u8888")
2184 _check_decode(b'\n', "\n")
2185 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2186 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187
Antoine Pitrou19690592009-06-12 20:14:08 +00002188 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002189 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 if encoding is not None:
2191 encoder = codecs.getincrementalencoder(encoding)()
2192 def _decode_bytewise(s):
2193 # Decode one byte at a time
2194 for b in encoder.encode(s):
2195 result.append(decoder.decode(b))
2196 else:
2197 encoder = None
2198 def _decode_bytewise(s):
2199 # Decode one char at a time
2200 for c in s:
2201 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002202 self.assertEquals(decoder.newlines, None)
2203 _decode_bytewise("abc\n\r")
2204 self.assertEquals(decoder.newlines, '\n')
2205 _decode_bytewise("\nabc")
2206 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2207 _decode_bytewise("abc\r")
2208 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2209 _decode_bytewise("abc")
2210 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2211 _decode_bytewise("abc\r")
2212 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2213 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002214 input = "abc"
2215 if encoder is not None:
2216 encoder.reset()
2217 input = encoder.encode(input)
2218 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002219 self.assertEquals(decoder.newlines, None)
2220
2221 def test_newline_decoder(self):
2222 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002223 # None meaning the IncrementalNewlineDecoder takes unicode input
2224 # rather than bytes input
2225 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002226 'utf-16', 'utf-16-le', 'utf-16-be',
2227 'utf-32', 'utf-32-le', 'utf-32-be',
2228 )
2229 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002230 decoder = enc and codecs.getincrementaldecoder(enc)()
2231 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2232 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002233 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002234 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2235 self.check_newline_decoding_utf8(decoder)
2236
2237 def test_newline_bytes(self):
2238 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2239 def _check(dec):
2240 self.assertEquals(dec.newlines, None)
2241 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2242 self.assertEquals(dec.newlines, None)
2243 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2244 self.assertEquals(dec.newlines, None)
2245 dec = self.IncrementalNewlineDecoder(None, translate=False)
2246 _check(dec)
2247 dec = self.IncrementalNewlineDecoder(None, translate=True)
2248 _check(dec)
2249
2250class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2251 pass
2252
2253class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2254 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002255
Christian Heimes1a6387e2008-03-26 12:49:49 +00002256
2257# XXX Tests for open()
2258
2259class MiscIOTest(unittest.TestCase):
2260
Benjamin Petersonad100c32008-11-20 22:06:22 +00002261 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002262 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002263
Antoine Pitrou19690592009-06-12 20:14:08 +00002264 def test___all__(self):
2265 for name in self.io.__all__:
2266 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002267 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002268 if name == "open":
2269 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002270 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002271 self.assertTrue(issubclass(obj, Exception), name)
2272 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002274
Benjamin Petersonad100c32008-11-20 22:06:22 +00002275 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002276 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002277 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002278 f.close()
2279
Antoine Pitrou19690592009-06-12 20:14:08 +00002280 f = self.open(support.TESTFN, "U")
2281 self.assertEquals(f.name, support.TESTFN)
2282 self.assertEquals(f.buffer.name, support.TESTFN)
2283 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002284 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002285 self.assertEquals(f.buffer.mode, "rb")
2286 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002287 f.close()
2288
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002290 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002291 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2292 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002293
Antoine Pitrou19690592009-06-12 20:14:08 +00002294 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002295 self.assertEquals(g.mode, "wb")
2296 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002297 self.assertEquals(g.name, f.fileno())
2298 self.assertEquals(g.raw.name, f.fileno())
2299 f.close()
2300 g.close()
2301
Antoine Pitrou19690592009-06-12 20:14:08 +00002302 def test_io_after_close(self):
2303 for kwargs in [
2304 {"mode": "w"},
2305 {"mode": "wb"},
2306 {"mode": "w", "buffering": 1},
2307 {"mode": "w", "buffering": 2},
2308 {"mode": "wb", "buffering": 0},
2309 {"mode": "r"},
2310 {"mode": "rb"},
2311 {"mode": "r", "buffering": 1},
2312 {"mode": "r", "buffering": 2},
2313 {"mode": "rb", "buffering": 0},
2314 {"mode": "w+"},
2315 {"mode": "w+b"},
2316 {"mode": "w+", "buffering": 1},
2317 {"mode": "w+", "buffering": 2},
2318 {"mode": "w+b", "buffering": 0},
2319 ]:
2320 f = self.open(support.TESTFN, **kwargs)
2321 f.close()
2322 self.assertRaises(ValueError, f.flush)
2323 self.assertRaises(ValueError, f.fileno)
2324 self.assertRaises(ValueError, f.isatty)
2325 self.assertRaises(ValueError, f.__iter__)
2326 if hasattr(f, "peek"):
2327 self.assertRaises(ValueError, f.peek, 1)
2328 self.assertRaises(ValueError, f.read)
2329 if hasattr(f, "read1"):
2330 self.assertRaises(ValueError, f.read1, 1024)
2331 if hasattr(f, "readinto"):
2332 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2333 self.assertRaises(ValueError, f.readline)
2334 self.assertRaises(ValueError, f.readlines)
2335 self.assertRaises(ValueError, f.seek, 0)
2336 self.assertRaises(ValueError, f.tell)
2337 self.assertRaises(ValueError, f.truncate)
2338 self.assertRaises(ValueError, f.write,
2339 b"" if "b" in kwargs['mode'] else "")
2340 self.assertRaises(ValueError, f.writelines, [])
2341 self.assertRaises(ValueError, next, f)
2342
2343 def test_blockingioerror(self):
2344 # Various BlockingIOError issues
2345 self.assertRaises(TypeError, self.BlockingIOError)
2346 self.assertRaises(TypeError, self.BlockingIOError, 1)
2347 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2348 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2349 b = self.BlockingIOError(1, "")
2350 self.assertEqual(b.characters_written, 0)
2351 class C(unicode):
2352 pass
2353 c = C("")
2354 b = self.BlockingIOError(1, c)
2355 c.b = b
2356 b.c = c
2357 wr = weakref.ref(c)
2358 del c, b
2359 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002360 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002361
2362 def test_abcs(self):
2363 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002364 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2365 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2366 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2367 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002368
2369 def _check_abc_inheritance(self, abcmodule):
2370 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002371 self.assertIsInstance(f, abcmodule.IOBase)
2372 self.assertIsInstance(f, abcmodule.RawIOBase)
2373 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2374 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002375 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002376 self.assertIsInstance(f, abcmodule.IOBase)
2377 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2378 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2379 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002380 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002381 self.assertIsInstance(f, abcmodule.IOBase)
2382 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2383 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2384 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002385
2386 def test_abc_inheritance(self):
2387 # Test implementations inherit from their respective ABCs
2388 self._check_abc_inheritance(self)
2389
2390 def test_abc_inheritance_official(self):
2391 # Test implementations inherit from the official ABCs of the
2392 # baseline "io" module.
2393 self._check_abc_inheritance(io)
2394
2395class CMiscIOTest(MiscIOTest):
2396 io = io
2397
2398class PyMiscIOTest(MiscIOTest):
2399 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002400
Christian Heimes1a6387e2008-03-26 12:49:49 +00002401def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002402 tests = (CIOTest, PyIOTest,
2403 CBufferedReaderTest, PyBufferedReaderTest,
2404 CBufferedWriterTest, PyBufferedWriterTest,
2405 CBufferedRWPairTest, PyBufferedRWPairTest,
2406 CBufferedRandomTest, PyBufferedRandomTest,
2407 StatefulIncrementalDecoderTest,
2408 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2409 CTextIOWrapperTest, PyTextIOWrapperTest,
2410 CMiscIOTest, PyMiscIOTest,
2411 )
2412
2413 # Put the namespaces of the IO module we are testing and some useful mock
2414 # classes in the __dict__ of each test.
2415 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2416 MockNonBlockWriterIO)
2417 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2418 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2419 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2420 globs = globals()
2421 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2422 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2423 # Avoid turning open into a bound method.
2424 py_io_ns["open"] = pyio.OpenWrapper
2425 for test in tests:
2426 if test.__name__.startswith("C"):
2427 for name, obj in c_io_ns.items():
2428 setattr(test, name, obj)
2429 elif test.__name__.startswith("Py"):
2430 for name, obj in py_io_ns.items():
2431 setattr(test, name, obj)
2432
2433 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002434
2435if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002436 test_main()