blob: 0c335a3dd117b6434e0fc748f4ad675402b162f2 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import warnings
33import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000034import abc
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
42
43__metaclass__ = type
44bytes = support.py3k_bytes
45
46def _default_chunk_size():
47 """Get the default TextIOWrapper chunk size"""
48 with io.open(__file__, "r", encoding="latin1") as f:
49 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000050
51
Antoine Pitrou19690592009-06-12 20:14:08 +000052class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000053
54 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000057 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000058
59 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000060 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000061 try:
62 return self._read_stack.pop(0)
63 except:
64 return b""
65
66 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
94 return 0
95 if data is None:
96 del self._read_stack[0]
97 return None
98 n = len(data)
99 if len(data) <= max_len:
100 del self._read_stack[0]
101 buf[:n] = data
102 return n
103 else:
104 buf[:] = data[:max_len]
105 self._read_stack[0] = data[max_len:]
106 return max_len
107
108 def truncate(self, pos=None):
109 return pos
110
111class CMockRawIO(MockRawIO, io.RawIOBase):
112 pass
113
114class PyMockRawIO(MockRawIO, pyio.RawIOBase):
115 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000116
117
Antoine Pitrou19690592009-06-12 20:14:08 +0000118class MisbehavedRawIO(MockRawIO):
119 def write(self, b):
120 return MockRawIO.write(self, b) * 2
121
122 def read(self, n=None):
123 return MockRawIO.read(self, n) * 2
124
125 def seek(self, pos, whence):
126 return -123
127
128 def tell(self):
129 return -456
130
131 def readinto(self, buf):
132 MockRawIO.readinto(self, buf)
133 return len(buf) * 5
134
135class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
136 pass
137
138class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
139 pass
140
141
142class CloseFailureIO(MockRawIO):
143 closed = 0
144
145 def close(self):
146 if not self.closed:
147 self.closed = 1
148 raise IOError
149
150class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
151 pass
152
153class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
154 pass
155
156
157class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000158
159 def __init__(self, data):
160 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000161 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000162
163 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000164 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000165 self.read_history.append(None if res is None else len(res))
166 return res
167
Antoine Pitrou19690592009-06-12 20:14:08 +0000168 def readinto(self, b):
169 res = super(MockFileIO, self).readinto(b)
170 self.read_history.append(res)
171 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000172
Antoine Pitrou19690592009-06-12 20:14:08 +0000173class CMockFileIO(MockFileIO, io.BytesIO):
174 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000175
Antoine Pitrou19690592009-06-12 20:14:08 +0000176class PyMockFileIO(MockFileIO, pyio.BytesIO):
177 pass
178
179
180class MockNonBlockWriterIO:
181
182 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000184 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def pop_written(self):
187 s = b"".join(self._write_stack)
188 self._write_stack[:] = []
189 return s
190
191 def block_on(self, char):
192 """Block when a given char is encountered."""
193 self._blocker_char = char
194
195 def readable(self):
196 return True
197
198 def seekable(self):
199 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
201 def writable(self):
202 return True
203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def write(self, b):
205 b = bytes(b)
206 n = -1
207 if self._blocker_char:
208 try:
209 n = b.index(self._blocker_char)
210 except ValueError:
211 pass
212 else:
213 self._blocker_char = None
214 self._write_stack.append(b[:n])
215 raise self.BlockingIOError(0, "test blocking", n)
216 self._write_stack.append(b)
217 return len(b)
218
219class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
220 BlockingIOError = io.BlockingIOError
221
222class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
223 BlockingIOError = pyio.BlockingIOError
224
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226class IOTest(unittest.TestCase):
227
Antoine Pitrou19690592009-06-12 20:14:08 +0000228 def setUp(self):
229 support.unlink(support.TESTFN)
230
Christian Heimes1a6387e2008-03-26 12:49:49 +0000231 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000233
234 def write_ops(self, f):
235 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000236 f.truncate(0)
237 self.assertEqual(f.tell(), 5)
238 f.seek(0)
239
240 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000241 self.assertEqual(f.seek(0), 0)
242 self.assertEqual(f.write(b"Hello."), 6)
243 self.assertEqual(f.tell(), 6)
244 self.assertEqual(f.seek(-1, 1), 5)
245 self.assertEqual(f.tell(), 5)
246 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
247 self.assertEqual(f.seek(0), 0)
248 self.assertEqual(f.write(b"h"), 1)
249 self.assertEqual(f.seek(-1, 2), 13)
250 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000251
Christian Heimes1a6387e2008-03-26 12:49:49 +0000252 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000253 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254 self.assertRaises(TypeError, f.seek, 0.0)
255
256 def read_ops(self, f, buffered=False):
257 data = f.read(5)
258 self.assertEqual(data, b"hello")
259 data = bytearray(data)
260 self.assertEqual(f.readinto(data), 5)
261 self.assertEqual(data, b" worl")
262 self.assertEqual(f.readinto(data), 2)
263 self.assertEqual(len(data), 5)
264 self.assertEqual(data[:2], b"d\n")
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.read(20), b"hello world\n")
267 self.assertEqual(f.read(1), b"")
268 self.assertEqual(f.readinto(bytearray(b"x")), 0)
269 self.assertEqual(f.seek(-6, 2), 6)
270 self.assertEqual(f.read(5), b"world")
271 self.assertEqual(f.read(0), b"")
272 self.assertEqual(f.readinto(bytearray()), 0)
273 self.assertEqual(f.seek(-6, 1), 5)
274 self.assertEqual(f.read(5), b" worl")
275 self.assertEqual(f.tell(), 10)
276 self.assertRaises(TypeError, f.seek, 0.0)
277 if buffered:
278 f.seek(0)
279 self.assertEqual(f.read(), b"hello world\n")
280 f.seek(6)
281 self.assertEqual(f.read(), b"world\n")
282 self.assertEqual(f.read(), b"")
283
284 LARGE = 2**31
285
286 def large_file_ops(self, f):
287 assert f.readable()
288 assert f.writable()
289 self.assertEqual(f.seek(self.LARGE), self.LARGE)
290 self.assertEqual(f.tell(), self.LARGE)
291 self.assertEqual(f.write(b"xxx"), 3)
292 self.assertEqual(f.tell(), self.LARGE + 3)
293 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
294 self.assertEqual(f.truncate(), self.LARGE + 2)
295 self.assertEqual(f.tell(), self.LARGE + 2)
296 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
297 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000298 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000299 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
300 self.assertEqual(f.seek(-1, 2), self.LARGE)
301 self.assertEqual(f.read(2), b"x")
302
Antoine Pitrou19690592009-06-12 20:14:08 +0000303 def test_invalid_operations(self):
304 # Try writing on a file opened in read mode and vice-versa.
305 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000306 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000307 self.assertRaises(IOError, fp.read)
308 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000309 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000310 self.assertRaises(IOError, fp.write, b"blah")
311 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000312 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000313 self.assertRaises(IOError, fp.write, "blah")
314 self.assertRaises(IOError, fp.writelines, ["blah\n"])
315
Christian Heimes1a6387e2008-03-26 12:49:49 +0000316 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000317 with self.open(support.TESTFN, "wb", buffering=0) as f:
318 self.assertEqual(f.readable(), False)
319 self.assertEqual(f.writable(), True)
320 self.assertEqual(f.seekable(), True)
321 self.write_ops(f)
322 with self.open(support.TESTFN, "rb", buffering=0) as f:
323 self.assertEqual(f.readable(), True)
324 self.assertEqual(f.writable(), False)
325 self.assertEqual(f.seekable(), True)
326 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000327
328 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000329 with self.open(support.TESTFN, "wb") as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb") as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000339
340 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000341 with self.open(support.TESTFN, "wb") as f:
342 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
343 with self.open(support.TESTFN, "rb") as f:
344 self.assertEqual(f.readline(), b"abc\n")
345 self.assertEqual(f.readline(10), b"def\n")
346 self.assertEqual(f.readline(2), b"xy")
347 self.assertEqual(f.readline(4), b"zzy\n")
348 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000349 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000350 self.assertRaises(TypeError, f.readline, 5.3)
351 with self.open(support.TESTFN, "r") as f:
352 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000353
354 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000355 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000356 self.write_ops(f)
357 data = f.getvalue()
358 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000360 self.read_ops(f, True)
361
362 def test_large_file_ops(self):
363 # On Windows and Mac OSX this test comsumes large resources; It takes
364 # a long time to build the >2GB file and takes >2GB of disk space
365 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000366 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
367 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000368 print("\nTesting large file ops skipped on %s." % sys.platform,
369 file=sys.stderr)
370 print("It requires %d bytes and a long time." % self.LARGE,
371 file=sys.stderr)
372 print("Use 'regrtest.py -u largefile test_io' to run it.",
373 file=sys.stderr)
374 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000375 with self.open(support.TESTFN, "w+b", 0) as f:
376 self.large_file_ops(f)
377 with self.open(support.TESTFN, "w+b") as f:
378 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000379
380 def test_with_open(self):
381 for bufsize in (0, 1, 100):
382 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384 f.write(b"xxx")
385 self.assertEqual(f.closed, True)
386 f = None
387 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000388 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000389 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 except ZeroDivisionError:
391 self.assertEqual(f.closed, True)
392 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000393 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000394
Antoine Pitroue741cc62009-01-21 00:45:36 +0000395 # issue 5008
396 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000397 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000398 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000399 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000401 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000402 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000404 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000405
Christian Heimes1a6387e2008-03-26 12:49:49 +0000406 def test_destructor(self):
407 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000408 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 def __del__(self):
410 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000411 try:
412 f = super(MyFileIO, self).__del__
413 except AttributeError:
414 pass
415 else:
416 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 def close(self):
418 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000420 def flush(self):
421 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 super(MyFileIO, self).flush()
423 f = MyFileIO(support.TESTFN, "wb")
424 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000425 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 support.gc_collect()
427 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000428 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 self.assertEqual(f.read(), b"xxx")
430
431 def _check_base_destructor(self, base):
432 record = []
433 class MyIO(base):
434 def __init__(self):
435 # This exercises the availability of attributes on object
436 # destruction.
437 # (in the C version, close() is called by the tp_dealloc
438 # function, not by __del__)
439 self.on_del = 1
440 self.on_close = 2
441 self.on_flush = 3
442 def __del__(self):
443 record.append(self.on_del)
444 try:
445 f = super(MyIO, self).__del__
446 except AttributeError:
447 pass
448 else:
449 f()
450 def close(self):
451 record.append(self.on_close)
452 super(MyIO, self).close()
453 def flush(self):
454 record.append(self.on_flush)
455 super(MyIO, self).flush()
456 f = MyIO()
457 del f
458 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000459 self.assertEqual(record, [1, 2, 3])
460
Antoine Pitrou19690592009-06-12 20:14:08 +0000461 def test_IOBase_destructor(self):
462 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000463
Antoine Pitrou19690592009-06-12 20:14:08 +0000464 def test_RawIOBase_destructor(self):
465 self._check_base_destructor(self.RawIOBase)
466
467 def test_BufferedIOBase_destructor(self):
468 self._check_base_destructor(self.BufferedIOBase)
469
470 def test_TextIOBase_destructor(self):
471 self._check_base_destructor(self.TextIOBase)
472
473 def test_close_flushes(self):
474 with self.open(support.TESTFN, "wb") as f:
475 f.write(b"xxx")
476 with self.open(support.TESTFN, "rb") as f:
477 self.assertEqual(f.read(), b"xxx")
478
479 def test_array_writes(self):
480 a = array.array(b'i', range(10))
481 n = len(a.tostring())
482 with self.open(support.TESTFN, "wb", 0) as f:
483 self.assertEqual(f.write(a), n)
484 with self.open(support.TESTFN, "wb") as f:
485 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000486
487 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000489 closefd=False)
490
Antoine Pitrou19690592009-06-12 20:14:08 +0000491 def test_read_closed(self):
492 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000493 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000494 with self.open(support.TESTFN, "r") as f:
495 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000496 self.assertEqual(file.read(), "egg\n")
497 file.seek(0)
498 file.close()
499 self.assertRaises(ValueError, file.read)
500
501 def test_no_closefd_with_filename(self):
502 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000503 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000504
505 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000507 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000509 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000510 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 self.assertEqual(file.buffer.raw.closefd, False)
512
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 def test_garbage_collection(self):
514 # FileIO objects are collected, and collecting them flushes
515 # all data to disk.
516 f = self.FileIO(support.TESTFN, "wb")
517 f.write(b"abcxxx")
518 f.f = f
519 wr = weakref.ref(f)
520 del f
521 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000522 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000523 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 def test_unbounded_file(self):
527 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
528 zero = "/dev/zero"
529 if not os.path.exists(zero):
530 self.skipTest("{0} does not exist".format(zero))
531 if sys.maxsize > 0x7FFFFFFF:
532 self.skipTest("test can only run in a 32-bit address space")
533 if support.real_max_memuse < support._2G:
534 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000535 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000536 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000537 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000539 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000541
Antoine Pitrou19690592009-06-12 20:14:08 +0000542class CIOTest(IOTest):
543 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000544
Antoine Pitrou19690592009-06-12 20:14:08 +0000545class PyIOTest(IOTest):
546 test_array_writes = unittest.skip(
547 "len(array.array) returns number of elements rather than bytelength"
548 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000549
550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551class CommonBufferedTests:
552 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
553
554 def test_detach(self):
555 raw = self.MockRawIO()
556 buf = self.tp(raw)
557 self.assertIs(buf.detach(), raw)
558 self.assertRaises(ValueError, buf.detach)
559
560 def test_fileno(self):
561 rawio = self.MockRawIO()
562 bufio = self.tp(rawio)
563
564 self.assertEquals(42, bufio.fileno())
565
566 def test_no_fileno(self):
567 # XXX will we always have fileno() function? If so, kill
568 # this test. Else, write it.
569 pass
570
571 def test_invalid_args(self):
572 rawio = self.MockRawIO()
573 bufio = self.tp(rawio)
574 # Invalid whence
575 self.assertRaises(ValueError, bufio.seek, 0, -1)
576 self.assertRaises(ValueError, bufio.seek, 0, 3)
577
578 def test_override_destructor(self):
579 tp = self.tp
580 record = []
581 class MyBufferedIO(tp):
582 def __del__(self):
583 record.append(1)
584 try:
585 f = super(MyBufferedIO, self).__del__
586 except AttributeError:
587 pass
588 else:
589 f()
590 def close(self):
591 record.append(2)
592 super(MyBufferedIO, self).close()
593 def flush(self):
594 record.append(3)
595 super(MyBufferedIO, self).flush()
596 rawio = self.MockRawIO()
597 bufio = MyBufferedIO(rawio)
598 writable = bufio.writable()
599 del bufio
600 support.gc_collect()
601 if writable:
602 self.assertEqual(record, [1, 2, 3])
603 else:
604 self.assertEqual(record, [1, 2])
605
606 def test_context_manager(self):
607 # Test usability as a context manager
608 rawio = self.MockRawIO()
609 bufio = self.tp(rawio)
610 def _with():
611 with bufio:
612 pass
613 _with()
614 # bufio should now be closed, and using it a second time should raise
615 # a ValueError.
616 self.assertRaises(ValueError, _with)
617
618 def test_error_through_destructor(self):
619 # Test that the exception state is not modified by a destructor,
620 # even if close() fails.
621 rawio = self.CloseFailureIO()
622 def f():
623 self.tp(rawio).xyzzy
624 with support.captured_output("stderr") as s:
625 self.assertRaises(AttributeError, f)
626 s = s.getvalue().strip()
627 if s:
628 # The destructor *may* have printed an unraisable error, check it
629 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000630 self.assertTrue(s.startswith("Exception IOError: "), s)
631 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000632
633 def test_repr(self):
634 raw = self.MockRawIO()
635 b = self.tp(raw)
636 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
637 self.assertEqual(repr(b), "<%s>" % clsname)
638 raw.name = "dummy"
639 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
640 raw.name = b"dummy"
641 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000642
643
Antoine Pitrou19690592009-06-12 20:14:08 +0000644class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
645 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000646
Antoine Pitrou19690592009-06-12 20:14:08 +0000647 def test_constructor(self):
648 rawio = self.MockRawIO([b"abc"])
649 bufio = self.tp(rawio)
650 bufio.__init__(rawio)
651 bufio.__init__(rawio, buffer_size=1024)
652 bufio.__init__(rawio, buffer_size=16)
653 self.assertEquals(b"abc", bufio.read())
654 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
655 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
656 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
657 rawio = self.MockRawIO([b"abc"])
658 bufio.__init__(rawio)
659 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000660
Antoine Pitrou19690592009-06-12 20:14:08 +0000661 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000662 for arg in (None, 7):
663 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
664 bufio = self.tp(rawio)
665 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000666 # Invalid args
667 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000668
Antoine Pitrou19690592009-06-12 20:14:08 +0000669 def test_read1(self):
670 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
671 bufio = self.tp(rawio)
672 self.assertEquals(b"a", bufio.read(1))
673 self.assertEquals(b"b", bufio.read1(1))
674 self.assertEquals(rawio._reads, 1)
675 self.assertEquals(b"c", bufio.read1(100))
676 self.assertEquals(rawio._reads, 1)
677 self.assertEquals(b"d", bufio.read1(100))
678 self.assertEquals(rawio._reads, 2)
679 self.assertEquals(b"efg", bufio.read1(100))
680 self.assertEquals(rawio._reads, 3)
681 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000682 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000683 # Invalid args
684 self.assertRaises(ValueError, bufio.read1, -1)
685
686 def test_readinto(self):
687 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
688 bufio = self.tp(rawio)
689 b = bytearray(2)
690 self.assertEquals(bufio.readinto(b), 2)
691 self.assertEquals(b, b"ab")
692 self.assertEquals(bufio.readinto(b), 2)
693 self.assertEquals(b, b"cd")
694 self.assertEquals(bufio.readinto(b), 2)
695 self.assertEquals(b, b"ef")
696 self.assertEquals(bufio.readinto(b), 1)
697 self.assertEquals(b, b"gf")
698 self.assertEquals(bufio.readinto(b), 0)
699 self.assertEquals(b, b"gf")
700
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000701 def test_readlines(self):
702 def bufio():
703 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
704 return self.tp(rawio)
705 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
706 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
707 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
708
Antoine Pitrou19690592009-06-12 20:14:08 +0000709 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000710 data = b"abcdefghi"
711 dlen = len(data)
712
713 tests = [
714 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
715 [ 100, [ 3, 3, 3], [ dlen ] ],
716 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
717 ]
718
719 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000720 rawio = self.MockFileIO(data)
721 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000722 pos = 0
723 for nbytes in buf_read_sizes:
724 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
725 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000726 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000727 self.assertEquals(rawio.read_history, raw_read_sizes)
728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000730 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000731 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
732 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000733
734 self.assertEquals(b"abcd", bufio.read(6))
735 self.assertEquals(b"e", bufio.read(1))
736 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000738 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000739 self.assertEquals(b"", bufio.read())
740
Antoine Pitrou19690592009-06-12 20:14:08 +0000741 def test_read_past_eof(self):
742 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
743 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000744
745 self.assertEquals(b"abcdefg", bufio.read(9000))
746
Antoine Pitrou19690592009-06-12 20:14:08 +0000747 def test_read_all(self):
748 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
749 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000750
751 self.assertEquals(b"abcdefg", bufio.read())
752
Antoine Pitrou19690592009-06-12 20:14:08 +0000753 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000754 try:
755 # Write out many bytes with exactly the same number of 0's,
756 # 1's... 255's. This will help us check that concurrent reading
757 # doesn't duplicate or forget contents.
758 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000759 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000760 random.shuffle(l)
761 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000762 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000763 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000764 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000765 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000766 errors = []
767 results = []
768 def f():
769 try:
770 # Intra-buffer read then buffer-flushing read
771 for n in cycle([1, 19]):
772 s = bufio.read(n)
773 if not s:
774 break
775 # list.append() is atomic
776 results.append(s)
777 except Exception as e:
778 errors.append(e)
779 raise
780 threads = [threading.Thread(target=f) for x in range(20)]
781 for t in threads:
782 t.start()
783 time.sleep(0.02) # yield
784 for t in threads:
785 t.join()
786 self.assertFalse(errors,
787 "the following exceptions were caught: %r" % errors)
788 s = b''.join(results)
789 for i in range(256):
790 c = bytes(bytearray([i]))
791 self.assertEqual(s.count(c), N)
792 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000793 support.unlink(support.TESTFN)
794
795 def test_misbehaved_io(self):
796 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
797 bufio = self.tp(rawio)
798 self.assertRaises(IOError, bufio.seek, 0)
799 self.assertRaises(IOError, bufio.tell)
800
801class CBufferedReaderTest(BufferedReaderTest):
802 tp = io.BufferedReader
803
804 def test_constructor(self):
805 BufferedReaderTest.test_constructor(self)
806 # The allocation can succeed on 32-bit builds, e.g. with more
807 # than 2GB RAM and a 64-bit kernel.
808 if sys.maxsize > 0x7FFFFFFF:
809 rawio = self.MockRawIO()
810 bufio = self.tp(rawio)
811 self.assertRaises((OverflowError, MemoryError, ValueError),
812 bufio.__init__, rawio, sys.maxsize)
813
814 def test_initialization(self):
815 rawio = self.MockRawIO([b"abc"])
816 bufio = self.tp(rawio)
817 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
818 self.assertRaises(ValueError, bufio.read)
819 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
820 self.assertRaises(ValueError, bufio.read)
821 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
822 self.assertRaises(ValueError, bufio.read)
823
824 def test_misbehaved_io_read(self):
825 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
827 # _pyio.BufferedReader seems to implement reading different, so that
828 # checking this is not so easy.
829 self.assertRaises(IOError, bufio.read, 10)
830
831 def test_garbage_collection(self):
832 # C BufferedReader objects are collected.
833 # The Python version has __del__, so it ends into gc.garbage instead
834 rawio = self.FileIO(support.TESTFN, "w+b")
835 f = self.tp(rawio)
836 f.f = f
837 wr = weakref.ref(f)
838 del f
839 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000840 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000841
842class PyBufferedReaderTest(BufferedReaderTest):
843 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000844
845
Antoine Pitrou19690592009-06-12 20:14:08 +0000846class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
847 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000848
Antoine Pitrou19690592009-06-12 20:14:08 +0000849 def test_constructor(self):
850 rawio = self.MockRawIO()
851 bufio = self.tp(rawio)
852 bufio.__init__(rawio)
853 bufio.__init__(rawio, buffer_size=1024)
854 bufio.__init__(rawio, buffer_size=16)
855 self.assertEquals(3, bufio.write(b"abc"))
856 bufio.flush()
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
858 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
859 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
860 bufio.__init__(rawio)
861 self.assertEquals(3, bufio.write(b"ghi"))
862 bufio.flush()
863 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000864
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_detach_flush(self):
866 raw = self.MockRawIO()
867 buf = self.tp(raw)
868 buf.write(b"howdy!")
869 self.assertFalse(raw._write_stack)
870 buf.detach()
871 self.assertEqual(raw._write_stack, [b"howdy!"])
872
873 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000874 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000875 writer = self.MockRawIO()
876 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000877 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000878 self.assertFalse(writer._write_stack)
879
Antoine Pitrou19690592009-06-12 20:14:08 +0000880 def test_write_overflow(self):
881 writer = self.MockRawIO()
882 bufio = self.tp(writer, 8)
883 contents = b"abcdefghijklmnop"
884 for n in range(0, len(contents), 3):
885 bufio.write(contents[n:n+3])
886 flushed = b"".join(writer._write_stack)
887 # At least (total - 8) bytes were implicitly flushed, perhaps more
888 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000889 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000890
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 def check_writes(self, intermediate_func):
892 # Lots of writes, test the flushed output is as expected.
893 contents = bytes(range(256)) * 1000
894 n = 0
895 writer = self.MockRawIO()
896 bufio = self.tp(writer, 13)
897 # Generator of write sizes: repeat each N 15 times then proceed to N+1
898 def gen_sizes():
899 for size in count(1):
900 for i in range(15):
901 yield size
902 sizes = gen_sizes()
903 while n < len(contents):
904 size = min(next(sizes), len(contents) - n)
905 self.assertEquals(bufio.write(contents[n:n+size]), size)
906 intermediate_func(bufio)
907 n += size
908 bufio.flush()
909 self.assertEquals(contents,
910 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000911
Antoine Pitrou19690592009-06-12 20:14:08 +0000912 def test_writes(self):
913 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000914
Antoine Pitrou19690592009-06-12 20:14:08 +0000915 def test_writes_and_flushes(self):
916 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917
Antoine Pitrou19690592009-06-12 20:14:08 +0000918 def test_writes_and_seeks(self):
919 def _seekabs(bufio):
920 pos = bufio.tell()
921 bufio.seek(pos + 1, 0)
922 bufio.seek(pos - 1, 0)
923 bufio.seek(pos, 0)
924 self.check_writes(_seekabs)
925 def _seekrel(bufio):
926 pos = bufio.seek(0, 1)
927 bufio.seek(+1, 1)
928 bufio.seek(-1, 1)
929 bufio.seek(pos, 0)
930 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000931
Antoine Pitrou19690592009-06-12 20:14:08 +0000932 def test_writes_and_truncates(self):
933 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000934
Antoine Pitrou19690592009-06-12 20:14:08 +0000935 def test_write_non_blocking(self):
936 raw = self.MockNonBlockWriterIO()
937 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000938
Antoine Pitrou19690592009-06-12 20:14:08 +0000939 self.assertEquals(bufio.write(b"abcd"), 4)
940 self.assertEquals(bufio.write(b"efghi"), 5)
941 # 1 byte will be written, the rest will be buffered
942 raw.block_on(b"k")
943 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000944
Antoine Pitrou19690592009-06-12 20:14:08 +0000945 # 8 bytes will be written, 8 will be buffered and the rest will be lost
946 raw.block_on(b"0")
947 try:
948 bufio.write(b"opqrwxyz0123456789")
949 except self.BlockingIOError as e:
950 written = e.characters_written
951 else:
952 self.fail("BlockingIOError should have been raised")
953 self.assertEquals(written, 16)
954 self.assertEquals(raw.pop_written(),
955 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000956
Antoine Pitrou19690592009-06-12 20:14:08 +0000957 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
958 s = raw.pop_written()
959 # Previously buffered bytes were flushed
960 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000961
Antoine Pitrou19690592009-06-12 20:14:08 +0000962 def test_write_and_rewind(self):
963 raw = io.BytesIO()
964 bufio = self.tp(raw, 4)
965 self.assertEqual(bufio.write(b"abcdef"), 6)
966 self.assertEqual(bufio.tell(), 6)
967 bufio.seek(0, 0)
968 self.assertEqual(bufio.write(b"XY"), 2)
969 bufio.seek(6, 0)
970 self.assertEqual(raw.getvalue(), b"XYcdef")
971 self.assertEqual(bufio.write(b"123456"), 6)
972 bufio.flush()
973 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000974
Antoine Pitrou19690592009-06-12 20:14:08 +0000975 def test_flush(self):
976 writer = self.MockRawIO()
977 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000978 bufio.write(b"abc")
979 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000980 self.assertEquals(b"abc", writer._write_stack[0])
981
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 def test_destructor(self):
983 writer = self.MockRawIO()
984 bufio = self.tp(writer, 8)
985 bufio.write(b"abc")
986 del bufio
987 support.gc_collect()
988 self.assertEquals(b"abc", writer._write_stack[0])
989
990 def test_truncate(self):
991 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000992 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000993 bufio = self.tp(raw, 8)
994 bufio.write(b"abcdef")
995 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000996 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000997 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000998 self.assertEqual(f.read(), b"abc")
999
1000 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001001 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001002 # Write out many bytes from many threads and test they were
1003 # all flushed.
1004 N = 1000
1005 contents = bytes(range(256)) * N
1006 sizes = cycle([1, 19])
1007 n = 0
1008 queue = deque()
1009 while n < len(contents):
1010 size = next(sizes)
1011 queue.append(contents[n:n+size])
1012 n += size
1013 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001014 # We use a real file object because it allows us to
1015 # exercise situations where the GIL is released before
1016 # writing the buffer to the raw streams. This is in addition
1017 # to concurrency issues due to switching threads in the middle
1018 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001019 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001020 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001021 errors = []
1022 def f():
1023 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001024 while True:
1025 try:
1026 s = queue.popleft()
1027 except IndexError:
1028 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001029 bufio.write(s)
1030 except Exception as e:
1031 errors.append(e)
1032 raise
1033 threads = [threading.Thread(target=f) for x in range(20)]
1034 for t in threads:
1035 t.start()
1036 time.sleep(0.02) # yield
1037 for t in threads:
1038 t.join()
1039 self.assertFalse(errors,
1040 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001041 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001042 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001043 s = f.read()
1044 for i in range(256):
1045 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001046 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001047 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001048
Antoine Pitrou19690592009-06-12 20:14:08 +00001049 def test_misbehaved_io(self):
1050 rawio = self.MisbehavedRawIO()
1051 bufio = self.tp(rawio, 5)
1052 self.assertRaises(IOError, bufio.seek, 0)
1053 self.assertRaises(IOError, bufio.tell)
1054 self.assertRaises(IOError, bufio.write, b"abcdef")
1055
1056 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001057 with support.check_warnings(("max_buffer_size is deprecated",
1058 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001059 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001060
1061
1062class CBufferedWriterTest(BufferedWriterTest):
1063 tp = io.BufferedWriter
1064
1065 def test_constructor(self):
1066 BufferedWriterTest.test_constructor(self)
1067 # The allocation can succeed on 32-bit builds, e.g. with more
1068 # than 2GB RAM and a 64-bit kernel.
1069 if sys.maxsize > 0x7FFFFFFF:
1070 rawio = self.MockRawIO()
1071 bufio = self.tp(rawio)
1072 self.assertRaises((OverflowError, MemoryError, ValueError),
1073 bufio.__init__, rawio, sys.maxsize)
1074
1075 def test_initialization(self):
1076 rawio = self.MockRawIO()
1077 bufio = self.tp(rawio)
1078 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1079 self.assertRaises(ValueError, bufio.write, b"def")
1080 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1081 self.assertRaises(ValueError, bufio.write, b"def")
1082 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1083 self.assertRaises(ValueError, bufio.write, b"def")
1084
1085 def test_garbage_collection(self):
1086 # C BufferedWriter objects are collected, and collecting them flushes
1087 # all data to disk.
1088 # The Python version has __del__, so it ends into gc.garbage instead
1089 rawio = self.FileIO(support.TESTFN, "w+b")
1090 f = self.tp(rawio)
1091 f.write(b"123xxx")
1092 f.x = f
1093 wr = weakref.ref(f)
1094 del f
1095 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001096 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001097 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001098 self.assertEqual(f.read(), b"123xxx")
1099
1100
1101class PyBufferedWriterTest(BufferedWriterTest):
1102 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001103
1104class BufferedRWPairTest(unittest.TestCase):
1105
Antoine Pitrou19690592009-06-12 20:14:08 +00001106 def test_constructor(self):
1107 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001108 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 def test_detach(self):
1111 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1112 self.assertRaises(self.UnsupportedOperation, pair.detach)
1113
1114 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001115 with support.check_warnings(("max_buffer_size is deprecated",
1116 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001117 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001118
1119 def test_constructor_with_not_readable(self):
1120 class NotReadable(MockRawIO):
1121 def readable(self):
1122 return False
1123
1124 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1125
1126 def test_constructor_with_not_writeable(self):
1127 class NotWriteable(MockRawIO):
1128 def writable(self):
1129 return False
1130
1131 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1132
1133 def test_read(self):
1134 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1135
1136 self.assertEqual(pair.read(3), b"abc")
1137 self.assertEqual(pair.read(1), b"d")
1138 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001139 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1140 self.assertEqual(pair.read(None), b"abc")
1141
1142 def test_readlines(self):
1143 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1144 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1145 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1146 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001147
1148 def test_read1(self):
1149 # .read1() is delegated to the underlying reader object, so this test
1150 # can be shallow.
1151 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1152
1153 self.assertEqual(pair.read1(3), b"abc")
1154
1155 def test_readinto(self):
1156 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1157
1158 data = bytearray(5)
1159 self.assertEqual(pair.readinto(data), 5)
1160 self.assertEqual(data, b"abcde")
1161
1162 def test_write(self):
1163 w = self.MockRawIO()
1164 pair = self.tp(self.MockRawIO(), w)
1165
1166 pair.write(b"abc")
1167 pair.flush()
1168 pair.write(b"def")
1169 pair.flush()
1170 self.assertEqual(w._write_stack, [b"abc", b"def"])
1171
1172 def test_peek(self):
1173 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1174
1175 self.assertTrue(pair.peek(3).startswith(b"abc"))
1176 self.assertEqual(pair.read(3), b"abc")
1177
1178 def test_readable(self):
1179 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1180 self.assertTrue(pair.readable())
1181
1182 def test_writeable(self):
1183 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1184 self.assertTrue(pair.writable())
1185
1186 def test_seekable(self):
1187 # BufferedRWPairs are never seekable, even if their readers and writers
1188 # are.
1189 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1190 self.assertFalse(pair.seekable())
1191
1192 # .flush() is delegated to the underlying writer object and has been
1193 # tested in the test_write method.
1194
1195 def test_close_and_closed(self):
1196 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1197 self.assertFalse(pair.closed)
1198 pair.close()
1199 self.assertTrue(pair.closed)
1200
1201 def test_isatty(self):
1202 class SelectableIsAtty(MockRawIO):
1203 def __init__(self, isatty):
1204 MockRawIO.__init__(self)
1205 self._isatty = isatty
1206
1207 def isatty(self):
1208 return self._isatty
1209
1210 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1211 self.assertFalse(pair.isatty())
1212
1213 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1214 self.assertTrue(pair.isatty())
1215
1216 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1217 self.assertTrue(pair.isatty())
1218
1219 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1220 self.assertTrue(pair.isatty())
1221
1222class CBufferedRWPairTest(BufferedRWPairTest):
1223 tp = io.BufferedRWPair
1224
1225class PyBufferedRWPairTest(BufferedRWPairTest):
1226 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001227
1228
Antoine Pitrou19690592009-06-12 20:14:08 +00001229class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1230 read_mode = "rb+"
1231 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001232
Antoine Pitrou19690592009-06-12 20:14:08 +00001233 def test_constructor(self):
1234 BufferedReaderTest.test_constructor(self)
1235 BufferedWriterTest.test_constructor(self)
1236
1237 def test_read_and_write(self):
1238 raw = self.MockRawIO((b"asdf", b"ghjk"))
1239 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001240
1241 self.assertEqual(b"as", rw.read(2))
1242 rw.write(b"ddd")
1243 rw.write(b"eee")
1244 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001245 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001246 self.assertEquals(b"dddeee", raw._write_stack[0])
1247
Antoine Pitrou19690592009-06-12 20:14:08 +00001248 def test_seek_and_tell(self):
1249 raw = self.BytesIO(b"asdfghjkl")
1250 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001251
1252 self.assertEquals(b"as", rw.read(2))
1253 self.assertEquals(2, rw.tell())
1254 rw.seek(0, 0)
1255 self.assertEquals(b"asdf", rw.read(4))
1256
1257 rw.write(b"asdf")
1258 rw.seek(0, 0)
1259 self.assertEquals(b"asdfasdfl", rw.read())
1260 self.assertEquals(9, rw.tell())
1261 rw.seek(-4, 2)
1262 self.assertEquals(5, rw.tell())
1263 rw.seek(2, 1)
1264 self.assertEquals(7, rw.tell())
1265 self.assertEquals(b"fl", rw.read(11))
1266 self.assertRaises(TypeError, rw.seek, 0.0)
1267
Antoine Pitrou19690592009-06-12 20:14:08 +00001268 def check_flush_and_read(self, read_func):
1269 raw = self.BytesIO(b"abcdefghi")
1270 bufio = self.tp(raw)
1271
1272 self.assertEquals(b"ab", read_func(bufio, 2))
1273 bufio.write(b"12")
1274 self.assertEquals(b"ef", read_func(bufio, 2))
1275 self.assertEquals(6, bufio.tell())
1276 bufio.flush()
1277 self.assertEquals(6, bufio.tell())
1278 self.assertEquals(b"ghi", read_func(bufio))
1279 raw.seek(0, 0)
1280 raw.write(b"XYZ")
1281 # flush() resets the read buffer
1282 bufio.flush()
1283 bufio.seek(0, 0)
1284 self.assertEquals(b"XYZ", read_func(bufio, 3))
1285
1286 def test_flush_and_read(self):
1287 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1288
1289 def test_flush_and_readinto(self):
1290 def _readinto(bufio, n=-1):
1291 b = bytearray(n if n >= 0 else 9999)
1292 n = bufio.readinto(b)
1293 return bytes(b[:n])
1294 self.check_flush_and_read(_readinto)
1295
1296 def test_flush_and_peek(self):
1297 def _peek(bufio, n=-1):
1298 # This relies on the fact that the buffer can contain the whole
1299 # raw stream, otherwise peek() can return less.
1300 b = bufio.peek(n)
1301 if n != -1:
1302 b = b[:n]
1303 bufio.seek(len(b), 1)
1304 return b
1305 self.check_flush_and_read(_peek)
1306
1307 def test_flush_and_write(self):
1308 raw = self.BytesIO(b"abcdefghi")
1309 bufio = self.tp(raw)
1310
1311 bufio.write(b"123")
1312 bufio.flush()
1313 bufio.write(b"45")
1314 bufio.flush()
1315 bufio.seek(0, 0)
1316 self.assertEquals(b"12345fghi", raw.getvalue())
1317 self.assertEquals(b"12345fghi", bufio.read())
1318
1319 def test_threads(self):
1320 BufferedReaderTest.test_threads(self)
1321 BufferedWriterTest.test_threads(self)
1322
1323 def test_writes_and_peek(self):
1324 def _peek(bufio):
1325 bufio.peek(1)
1326 self.check_writes(_peek)
1327 def _peek(bufio):
1328 pos = bufio.tell()
1329 bufio.seek(-1, 1)
1330 bufio.peek(1)
1331 bufio.seek(pos, 0)
1332 self.check_writes(_peek)
1333
1334 def test_writes_and_reads(self):
1335 def _read(bufio):
1336 bufio.seek(-1, 1)
1337 bufio.read(1)
1338 self.check_writes(_read)
1339
1340 def test_writes_and_read1s(self):
1341 def _read1(bufio):
1342 bufio.seek(-1, 1)
1343 bufio.read1(1)
1344 self.check_writes(_read1)
1345
1346 def test_writes_and_readintos(self):
1347 def _read(bufio):
1348 bufio.seek(-1, 1)
1349 bufio.readinto(bytearray(1))
1350 self.check_writes(_read)
1351
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001352 def test_write_after_readahead(self):
1353 # Issue #6629: writing after the buffer was filled by readahead should
1354 # first rewind the raw stream.
1355 for overwrite_size in [1, 5]:
1356 raw = self.BytesIO(b"A" * 10)
1357 bufio = self.tp(raw, 4)
1358 # Trigger readahead
1359 self.assertEqual(bufio.read(1), b"A")
1360 self.assertEqual(bufio.tell(), 1)
1361 # Overwriting should rewind the raw stream if it needs so
1362 bufio.write(b"B" * overwrite_size)
1363 self.assertEqual(bufio.tell(), overwrite_size + 1)
1364 # If the write size was smaller than the buffer size, flush() and
1365 # check that rewind happens.
1366 bufio.flush()
1367 self.assertEqual(bufio.tell(), overwrite_size + 1)
1368 s = raw.getvalue()
1369 self.assertEqual(s,
1370 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1371
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001372 def test_truncate_after_read_or_write(self):
1373 raw = self.BytesIO(b"A" * 10)
1374 bufio = self.tp(raw, 100)
1375 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1376 self.assertEqual(bufio.truncate(), 2)
1377 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1378 self.assertEqual(bufio.truncate(), 4)
1379
Antoine Pitrou19690592009-06-12 20:14:08 +00001380 def test_misbehaved_io(self):
1381 BufferedReaderTest.test_misbehaved_io(self)
1382 BufferedWriterTest.test_misbehaved_io(self)
1383
1384class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1385 tp = io.BufferedRandom
1386
1387 def test_constructor(self):
1388 BufferedRandomTest.test_constructor(self)
1389 # The allocation can succeed on 32-bit builds, e.g. with more
1390 # than 2GB RAM and a 64-bit kernel.
1391 if sys.maxsize > 0x7FFFFFFF:
1392 rawio = self.MockRawIO()
1393 bufio = self.tp(rawio)
1394 self.assertRaises((OverflowError, MemoryError, ValueError),
1395 bufio.__init__, rawio, sys.maxsize)
1396
1397 def test_garbage_collection(self):
1398 CBufferedReaderTest.test_garbage_collection(self)
1399 CBufferedWriterTest.test_garbage_collection(self)
1400
1401class PyBufferedRandomTest(BufferedRandomTest):
1402 tp = pyio.BufferedRandom
1403
1404
Christian Heimes1a6387e2008-03-26 12:49:49 +00001405# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1406# properties:
1407# - A single output character can correspond to many bytes of input.
1408# - The number of input bytes to complete the character can be
1409# undetermined until the last input byte is received.
1410# - The number of input bytes can vary depending on previous input.
1411# - A single input byte can correspond to many characters of output.
1412# - The number of output characters can be undetermined until the
1413# last input byte is received.
1414# - The number of output characters can vary depending on previous input.
1415
1416class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1417 """
1418 For testing seek/tell behavior with a stateful, buffering decoder.
1419
1420 Input is a sequence of words. Words may be fixed-length (length set
1421 by input) or variable-length (period-terminated). In variable-length
1422 mode, extra periods are ignored. Possible words are:
1423 - 'i' followed by a number sets the input length, I (maximum 99).
1424 When I is set to 0, words are space-terminated.
1425 - 'o' followed by a number sets the output length, O (maximum 99).
1426 - Any other word is converted into a word followed by a period on
1427 the output. The output word consists of the input word truncated
1428 or padded out with hyphens to make its length equal to O. If O
1429 is 0, the word is output verbatim without truncating or padding.
1430 I and O are initially set to 1. When I changes, any buffered input is
1431 re-scanned according to the new I. EOF also terminates the last word.
1432 """
1433
1434 def __init__(self, errors='strict'):
1435 codecs.IncrementalDecoder.__init__(self, errors)
1436 self.reset()
1437
1438 def __repr__(self):
1439 return '<SID %x>' % id(self)
1440
1441 def reset(self):
1442 self.i = 1
1443 self.o = 1
1444 self.buffer = bytearray()
1445
1446 def getstate(self):
1447 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1448 return bytes(self.buffer), i*100 + o
1449
1450 def setstate(self, state):
1451 buffer, io = state
1452 self.buffer = bytearray(buffer)
1453 i, o = divmod(io, 100)
1454 self.i, self.o = i ^ 1, o ^ 1
1455
1456 def decode(self, input, final=False):
1457 output = ''
1458 for b in input:
1459 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001460 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001461 if self.buffer:
1462 output += self.process_word()
1463 else:
1464 self.buffer.append(b)
1465 else: # fixed-length, terminate after self.i bytes
1466 self.buffer.append(b)
1467 if len(self.buffer) == self.i:
1468 output += self.process_word()
1469 if final and self.buffer: # EOF terminates the last word
1470 output += self.process_word()
1471 return output
1472
1473 def process_word(self):
1474 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001475 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001476 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001477 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001478 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1479 else:
1480 output = self.buffer.decode('ascii')
1481 if len(output) < self.o:
1482 output += '-'*self.o # pad out with hyphens
1483 if self.o:
1484 output = output[:self.o] # truncate to output length
1485 output += '.'
1486 self.buffer = bytearray()
1487 return output
1488
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001489 codecEnabled = False
1490
1491 @classmethod
1492 def lookupTestDecoder(cls, name):
1493 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001494 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001495 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001496 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001497 incrementalencoder=None,
1498 streamreader=None, streamwriter=None,
1499 incrementaldecoder=cls)
1500
1501# Register the previous decoder for testing.
1502# Disabled by default, tests will enable it.
1503codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1504
1505
Christian Heimes1a6387e2008-03-26 12:49:49 +00001506class StatefulIncrementalDecoderTest(unittest.TestCase):
1507 """
1508 Make sure the StatefulIncrementalDecoder actually works.
1509 """
1510
1511 test_cases = [
1512 # I=1, O=1 (fixed-length input == fixed-length output)
1513 (b'abcd', False, 'a.b.c.d.'),
1514 # I=0, O=0 (variable-length input, variable-length output)
1515 (b'oiabcd', True, 'abcd.'),
1516 # I=0, O=0 (should ignore extra periods)
1517 (b'oi...abcd...', True, 'abcd.'),
1518 # I=0, O=6 (variable-length input, fixed-length output)
1519 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1520 # I=2, O=6 (fixed-length input < fixed-length output)
1521 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1522 # I=6, O=3 (fixed-length input > fixed-length output)
1523 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1524 # I=0, then 3; O=29, then 15 (with longer output)
1525 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1526 'a----------------------------.' +
1527 'b----------------------------.' +
1528 'cde--------------------------.' +
1529 'abcdefghijabcde.' +
1530 'a.b------------.' +
1531 '.c.------------.' +
1532 'd.e------------.' +
1533 'k--------------.' +
1534 'l--------------.' +
1535 'm--------------.')
1536 ]
1537
Antoine Pitrou19690592009-06-12 20:14:08 +00001538 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001539 # Try a few one-shot test cases.
1540 for input, eof, output in self.test_cases:
1541 d = StatefulIncrementalDecoder()
1542 self.assertEquals(d.decode(input, eof), output)
1543
1544 # Also test an unfinished decode, followed by forcing EOF.
1545 d = StatefulIncrementalDecoder()
1546 self.assertEquals(d.decode(b'oiabcd'), '')
1547 self.assertEquals(d.decode(b'', 1), 'abcd.')
1548
1549class TextIOWrapperTest(unittest.TestCase):
1550
1551 def setUp(self):
1552 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1553 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001554 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001555
1556 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001557 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001558
Antoine Pitrou19690592009-06-12 20:14:08 +00001559 def test_constructor(self):
1560 r = self.BytesIO(b"\xc3\xa9\n\n")
1561 b = self.BufferedReader(r, 1000)
1562 t = self.TextIOWrapper(b)
1563 t.__init__(b, encoding="latin1", newline="\r\n")
1564 self.assertEquals(t.encoding, "latin1")
1565 self.assertEquals(t.line_buffering, False)
1566 t.__init__(b, encoding="utf8", line_buffering=True)
1567 self.assertEquals(t.encoding, "utf8")
1568 self.assertEquals(t.line_buffering, True)
1569 self.assertEquals("\xe9\n", t.readline())
1570 self.assertRaises(TypeError, t.__init__, b, newline=42)
1571 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1572
1573 def test_detach(self):
1574 r = self.BytesIO()
1575 b = self.BufferedWriter(r)
1576 t = self.TextIOWrapper(b)
1577 self.assertIs(t.detach(), b)
1578
1579 t = self.TextIOWrapper(b, encoding="ascii")
1580 t.write("howdy")
1581 self.assertFalse(r.getvalue())
1582 t.detach()
1583 self.assertEqual(r.getvalue(), b"howdy")
1584 self.assertRaises(ValueError, t.detach)
1585
1586 def test_repr(self):
1587 raw = self.BytesIO("hello".encode("utf-8"))
1588 b = self.BufferedReader(raw)
1589 t = self.TextIOWrapper(b, encoding="utf-8")
1590 modname = self.TextIOWrapper.__module__
1591 self.assertEqual(repr(t),
1592 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1593 raw.name = "dummy"
1594 self.assertEqual(repr(t),
1595 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1596 raw.name = b"dummy"
1597 self.assertEqual(repr(t),
1598 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1599
1600 def test_line_buffering(self):
1601 r = self.BytesIO()
1602 b = self.BufferedWriter(r, 1000)
1603 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1604 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001605 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001606 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001607 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001608 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1610
Antoine Pitrou19690592009-06-12 20:14:08 +00001611 def test_encoding(self):
1612 # Check the encoding attribute is always set, and valid
1613 b = self.BytesIO()
1614 t = self.TextIOWrapper(b, encoding="utf8")
1615 self.assertEqual(t.encoding, "utf8")
1616 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001617 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001618 codecs.lookup(t.encoding)
1619
1620 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001621 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 b = self.BytesIO(b"abc\n\xff\n")
1623 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001624 self.assertRaises(UnicodeError, t.read)
1625 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001626 b = self.BytesIO(b"abc\n\xff\n")
1627 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001628 self.assertRaises(UnicodeError, t.read)
1629 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001630 b = self.BytesIO(b"abc\n\xff\n")
1631 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001632 self.assertEquals(t.read(), "abc\n\n")
1633 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001634 b = self.BytesIO(b"abc\n\xff\n")
1635 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1636 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001637
Antoine Pitrou19690592009-06-12 20:14:08 +00001638 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001639 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001640 b = self.BytesIO()
1641 t = self.TextIOWrapper(b, encoding="ascii")
1642 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001643 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001644 b = self.BytesIO()
1645 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1646 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001647 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001648 b = self.BytesIO()
1649 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001650 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001651 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001652 t.flush()
1653 self.assertEquals(b.getvalue(), b"abcdef\n")
1654 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001655 b = self.BytesIO()
1656 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001657 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001658 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001659 t.flush()
1660 self.assertEquals(b.getvalue(), b"abc?def\n")
1661
Antoine Pitrou19690592009-06-12 20:14:08 +00001662 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001663 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1664
1665 tests = [
1666 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1667 [ '', input_lines ],
1668 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1669 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1670 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1671 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001672 encodings = (
1673 'utf-8', 'latin-1',
1674 'utf-16', 'utf-16-le', 'utf-16-be',
1675 'utf-32', 'utf-32-le', 'utf-32-be',
1676 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001677
1678 # Try a range of buffer sizes to test the case where \r is the last
1679 # character in TextIOWrapper._pending_line.
1680 for encoding in encodings:
1681 # XXX: str.encode() should return bytes
1682 data = bytes(''.join(input_lines).encode(encoding))
1683 for do_reads in (False, True):
1684 for bufsize in range(1, 10):
1685 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001686 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1687 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001688 encoding=encoding)
1689 if do_reads:
1690 got_lines = []
1691 while True:
1692 c2 = textio.read(2)
1693 if c2 == '':
1694 break
1695 self.assertEquals(len(c2), 2)
1696 got_lines.append(c2 + textio.readline())
1697 else:
1698 got_lines = list(textio)
1699
1700 for got_line, exp_line in zip(got_lines, exp_lines):
1701 self.assertEquals(got_line, exp_line)
1702 self.assertEquals(len(got_lines), len(exp_lines))
1703
Antoine Pitrou19690592009-06-12 20:14:08 +00001704 def test_newlines_input(self):
1705 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001706 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1707 for newline, expected in [
1708 (None, normalized.decode("ascii").splitlines(True)),
1709 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001710 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1711 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1712 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001713 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001714 buf = self.BytesIO(testdata)
1715 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001716 self.assertEquals(txt.readlines(), expected)
1717 txt.seek(0)
1718 self.assertEquals(txt.read(), "".join(expected))
1719
Antoine Pitrou19690592009-06-12 20:14:08 +00001720 def test_newlines_output(self):
1721 testdict = {
1722 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1723 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1724 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1725 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1726 }
1727 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1728 for newline, expected in tests:
1729 buf = self.BytesIO()
1730 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1731 txt.write("AAA\nB")
1732 txt.write("BB\nCCC\n")
1733 txt.write("X\rY\r\nZ")
1734 txt.flush()
1735 self.assertEquals(buf.closed, False)
1736 self.assertEquals(buf.getvalue(), expected)
1737
1738 def test_destructor(self):
1739 l = []
1740 base = self.BytesIO
1741 class MyBytesIO(base):
1742 def close(self):
1743 l.append(self.getvalue())
1744 base.close(self)
1745 b = MyBytesIO()
1746 t = self.TextIOWrapper(b, encoding="ascii")
1747 t.write("abc")
1748 del t
1749 support.gc_collect()
1750 self.assertEquals([b"abc"], l)
1751
1752 def test_override_destructor(self):
1753 record = []
1754 class MyTextIO(self.TextIOWrapper):
1755 def __del__(self):
1756 record.append(1)
1757 try:
1758 f = super(MyTextIO, self).__del__
1759 except AttributeError:
1760 pass
1761 else:
1762 f()
1763 def close(self):
1764 record.append(2)
1765 super(MyTextIO, self).close()
1766 def flush(self):
1767 record.append(3)
1768 super(MyTextIO, self).flush()
1769 b = self.BytesIO()
1770 t = MyTextIO(b, encoding="ascii")
1771 del t
1772 support.gc_collect()
1773 self.assertEqual(record, [1, 2, 3])
1774
1775 def test_error_through_destructor(self):
1776 # Test that the exception state is not modified by a destructor,
1777 # even if close() fails.
1778 rawio = self.CloseFailureIO()
1779 def f():
1780 self.TextIOWrapper(rawio).xyzzy
1781 with support.captured_output("stderr") as s:
1782 self.assertRaises(AttributeError, f)
1783 s = s.getvalue().strip()
1784 if s:
1785 # The destructor *may* have printed an unraisable error, check it
1786 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001787 self.assertTrue(s.startswith("Exception IOError: "), s)
1788 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001789
1790 # Systematic tests of the text I/O API
1791
Antoine Pitrou19690592009-06-12 20:14:08 +00001792 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001793 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1794 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001795 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001796 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001797 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001798 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001799 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800 f._CHUNK_SIZE = chunksize
1801 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001802 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001803 cookie = f.tell()
1804 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001805 self.assertEquals(f.read(None), "abc")
1806 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001807 self.assertEquals(f.read(2), "ab")
1808 self.assertEquals(f.read(1), "c")
1809 self.assertEquals(f.read(1), "")
1810 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001811 self.assertEquals(f.tell(), cookie)
1812 self.assertEquals(f.seek(0), 0)
1813 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001814 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001815 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001816 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001817 if enc.startswith("utf"):
1818 self.multi_line_test(f, enc)
1819 f.close()
1820
1821 def multi_line_test(self, f, enc):
1822 f.seek(0)
1823 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001824 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825 wlines = []
1826 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1827 chars = []
1828 for i in range(size):
1829 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001830 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001831 wlines.append((f.tell(), line))
1832 f.write(line)
1833 f.seek(0)
1834 rlines = []
1835 while True:
1836 pos = f.tell()
1837 line = f.readline()
1838 if not line:
1839 break
1840 rlines.append((pos, line))
1841 self.assertEquals(rlines, wlines)
1842
Antoine Pitrou19690592009-06-12 20:14:08 +00001843 def test_telling(self):
1844 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001845 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001846 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001847 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001848 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849 p2 = f.tell()
1850 f.seek(0)
1851 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001852 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001853 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001854 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001855 self.assertEquals(f.tell(), p2)
1856 f.seek(0)
1857 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001858 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 self.assertRaises(IOError, f.tell)
1860 self.assertEquals(f.tell(), p2)
1861 f.close()
1862
Antoine Pitrou19690592009-06-12 20:14:08 +00001863 def test_seeking(self):
1864 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001865 prefix_size = chunk_size - 2
1866 u_prefix = "a" * prefix_size
1867 prefix = bytes(u_prefix.encode("utf-8"))
1868 self.assertEquals(len(u_prefix), len(prefix))
1869 u_suffix = "\u8888\n"
1870 suffix = bytes(u_suffix.encode("utf-8"))
1871 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001872 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001873 f.write(line*2)
1874 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001875 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001877 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001878 self.assertEquals(f.tell(), prefix_size)
1879 self.assertEquals(f.readline(), u_suffix)
1880
Antoine Pitrou19690592009-06-12 20:14:08 +00001881 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882 # Regression test for a specific bug
1883 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001884 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001885 f.write(data)
1886 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001887 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001888 f._CHUNK_SIZE # Just test that it exists
1889 f._CHUNK_SIZE = 2
1890 f.readline()
1891 f.tell()
1892
Antoine Pitrou19690592009-06-12 20:14:08 +00001893 def test_seek_and_tell(self):
1894 #Test seek/tell using the StatefulIncrementalDecoder.
1895 # Make test faster by doing smaller seeks
1896 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001897
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 """Tell/seek to various points within a data stream and ensure
1900 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001901 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001902 f.write(data)
1903 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001904 f = self.open(support.TESTFN, encoding='test_decoder')
1905 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001906 decoded = f.read()
1907 f.close()
1908
1909 for i in range(min_pos, len(decoded) + 1): # seek positions
1910 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001911 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 self.assertEquals(f.read(i), decoded[:i])
1913 cookie = f.tell()
1914 self.assertEquals(f.read(j), decoded[i:i + j])
1915 f.seek(cookie)
1916 self.assertEquals(f.read(), decoded[i:])
1917 f.close()
1918
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001919 # Enable the test decoder.
1920 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001921
1922 # Run the tests.
1923 try:
1924 # Try each test case.
1925 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001926 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001927
1928 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001929 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1930 offset = CHUNK_SIZE - len(input)//2
1931 prefix = b'.'*offset
1932 # Don't bother seeking into the prefix (takes too long).
1933 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001934 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001935
1936 # Ensure our test decoder won't interfere with subsequent tests.
1937 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001938 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939
Antoine Pitrou19690592009-06-12 20:14:08 +00001940 def test_encoded_writes(self):
1941 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001942 tests = ("utf-16",
1943 "utf-16-le",
1944 "utf-16-be",
1945 "utf-32",
1946 "utf-32-le",
1947 "utf-32-be")
1948 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001949 buf = self.BytesIO()
1950 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001951 # Check if the BOM is written only once (see issue1753).
1952 f.write(data)
1953 f.write(data)
1954 f.seek(0)
1955 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001956 f.seek(0)
1957 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001958 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1959
Antoine Pitrou19690592009-06-12 20:14:08 +00001960 def test_unreadable(self):
1961 class UnReadable(self.BytesIO):
1962 def readable(self):
1963 return False
1964 txt = self.TextIOWrapper(UnReadable())
1965 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001966
Antoine Pitrou19690592009-06-12 20:14:08 +00001967 def test_read_one_by_one(self):
1968 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969 reads = ""
1970 while True:
1971 c = txt.read(1)
1972 if not c:
1973 break
1974 reads += c
1975 self.assertEquals(reads, "AA\nBB")
1976
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001977 def test_readlines(self):
1978 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1979 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1980 txt.seek(0)
1981 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1982 txt.seek(0)
1983 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1984
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001986 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001987 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001988 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989 reads = ""
1990 while True:
1991 c = txt.read(128)
1992 if not c:
1993 break
1994 reads += c
1995 self.assertEquals(reads, "A"*127+"\nB")
1996
1997 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001998 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999
2000 # read one char at a time
2001 reads = ""
2002 while True:
2003 c = txt.read(1)
2004 if not c:
2005 break
2006 reads += c
2007 self.assertEquals(reads, self.normalized)
2008
2009 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002010 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002011 txt._CHUNK_SIZE = 4
2012
2013 reads = ""
2014 while True:
2015 c = txt.read(4)
2016 if not c:
2017 break
2018 reads += c
2019 self.assertEquals(reads, self.normalized)
2020
2021 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002022 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002023 txt._CHUNK_SIZE = 4
2024
2025 reads = txt.read(4)
2026 reads += txt.read(4)
2027 reads += txt.readline()
2028 reads += txt.readline()
2029 reads += txt.readline()
2030 self.assertEquals(reads, self.normalized)
2031
2032 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002033 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002034 txt._CHUNK_SIZE = 4
2035
2036 reads = txt.read(4)
2037 reads += txt.read()
2038 self.assertEquals(reads, self.normalized)
2039
2040 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002041 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002042 txt._CHUNK_SIZE = 4
2043
2044 reads = txt.read(4)
2045 pos = txt.tell()
2046 txt.seek(0)
2047 txt.seek(pos)
2048 self.assertEquals(txt.read(4), "BBB\n")
2049
2050 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002051 buffer = self.BytesIO(self.testdata)
2052 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053
2054 self.assertEqual(buffer.seekable(), txt.seekable())
2055
Antoine Pitrou19690592009-06-12 20:14:08 +00002056 @unittest.skip("Issue #6213 with incremental encoders")
2057 def test_append_bom(self):
2058 # The BOM is not written again when appending to a non-empty file
2059 filename = support.TESTFN
2060 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2061 with self.open(filename, 'w', encoding=charset) as f:
2062 f.write('aaa')
2063 pos = f.tell()
2064 with self.open(filename, 'rb') as f:
2065 self.assertEquals(f.read(), 'aaa'.encode(charset))
2066
2067 with self.open(filename, 'a', encoding=charset) as f:
2068 f.write('xxx')
2069 with self.open(filename, 'rb') as f:
2070 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2071
2072 @unittest.skip("Issue #6213 with incremental encoders")
2073 def test_seek_bom(self):
2074 # Same test, but when seeking manually
2075 filename = support.TESTFN
2076 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2077 with self.open(filename, 'w', encoding=charset) as f:
2078 f.write('aaa')
2079 pos = f.tell()
2080 with self.open(filename, 'r+', encoding=charset) as f:
2081 f.seek(pos)
2082 f.write('zzz')
2083 f.seek(0)
2084 f.write('bbb')
2085 with self.open(filename, 'rb') as f:
2086 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2087
2088 def test_errors_property(self):
2089 with self.open(support.TESTFN, "w") as f:
2090 self.assertEqual(f.errors, "strict")
2091 with self.open(support.TESTFN, "w", errors="replace") as f:
2092 self.assertEqual(f.errors, "replace")
2093
2094
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002095 def test_threads_write(self):
2096 # Issue6750: concurrent writes could duplicate data
2097 event = threading.Event()
2098 with self.open(support.TESTFN, "w", buffering=1) as f:
2099 def run(n):
2100 text = "Thread%03d\n" % n
2101 event.wait()
2102 f.write(text)
2103 threads = [threading.Thread(target=lambda n=x: run(n))
2104 for x in range(20)]
2105 for t in threads:
2106 t.start()
2107 time.sleep(0.02)
2108 event.set()
2109 for t in threads:
2110 t.join()
2111 with self.open(support.TESTFN) as f:
2112 content = f.read()
2113 for n in range(20):
2114 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2115
Antoine Pitrou19690592009-06-12 20:14:08 +00002116class CTextIOWrapperTest(TextIOWrapperTest):
2117
2118 def test_initialization(self):
2119 r = self.BytesIO(b"\xc3\xa9\n\n")
2120 b = self.BufferedReader(r, 1000)
2121 t = self.TextIOWrapper(b)
2122 self.assertRaises(TypeError, t.__init__, b, newline=42)
2123 self.assertRaises(ValueError, t.read)
2124 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2125 self.assertRaises(ValueError, t.read)
2126
2127 def test_garbage_collection(self):
2128 # C TextIOWrapper objects are collected, and collecting them flushes
2129 # all data to disk.
2130 # The Python version has __del__, so it ends in gc.garbage instead.
2131 rawio = io.FileIO(support.TESTFN, "wb")
2132 b = self.BufferedWriter(rawio)
2133 t = self.TextIOWrapper(b, encoding="ascii")
2134 t.write("456def")
2135 t.x = t
2136 wr = weakref.ref(t)
2137 del t
2138 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002139 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002140 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002141 self.assertEqual(f.read(), b"456def")
2142
2143class PyTextIOWrapperTest(TextIOWrapperTest):
2144 pass
2145
2146
2147class IncrementalNewlineDecoderTest(unittest.TestCase):
2148
2149 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002150 # UTF-8 specific tests for a newline decoder
2151 def _check_decode(b, s, **kwargs):
2152 # We exercise getstate() / setstate() as well as decode()
2153 state = decoder.getstate()
2154 self.assertEquals(decoder.decode(b, **kwargs), s)
2155 decoder.setstate(state)
2156 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002157
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002158 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002159
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002160 _check_decode(b'\xe8', "")
2161 _check_decode(b'\xa2', "")
2162 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002163
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002164 _check_decode(b'\xe8', "")
2165 _check_decode(b'\xa2', "")
2166 _check_decode(b'\x88', "\u8888")
2167
2168 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002169 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2170
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002171 decoder.reset()
2172 _check_decode(b'\n', "\n")
2173 _check_decode(b'\r', "")
2174 _check_decode(b'', "\n", final=True)
2175 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002176
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002177 _check_decode(b'\r', "")
2178 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002180 _check_decode(b'\r\r\n', "\n\n")
2181 _check_decode(b'\r', "")
2182 _check_decode(b'\r', "\n")
2183 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002184
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002185 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2186 _check_decode(b'\xe8\xa2\x88', "\u8888")
2187 _check_decode(b'\n', "\n")
2188 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2189 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002190
Antoine Pitrou19690592009-06-12 20:14:08 +00002191 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002192 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002193 if encoding is not None:
2194 encoder = codecs.getincrementalencoder(encoding)()
2195 def _decode_bytewise(s):
2196 # Decode one byte at a time
2197 for b in encoder.encode(s):
2198 result.append(decoder.decode(b))
2199 else:
2200 encoder = None
2201 def _decode_bytewise(s):
2202 # Decode one char at a time
2203 for c in s:
2204 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002205 self.assertEquals(decoder.newlines, None)
2206 _decode_bytewise("abc\n\r")
2207 self.assertEquals(decoder.newlines, '\n')
2208 _decode_bytewise("\nabc")
2209 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2210 _decode_bytewise("abc\r")
2211 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2212 _decode_bytewise("abc")
2213 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2214 _decode_bytewise("abc\r")
2215 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2216 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002217 input = "abc"
2218 if encoder is not None:
2219 encoder.reset()
2220 input = encoder.encode(input)
2221 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002222 self.assertEquals(decoder.newlines, None)
2223
2224 def test_newline_decoder(self):
2225 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002226 # None meaning the IncrementalNewlineDecoder takes unicode input
2227 # rather than bytes input
2228 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002229 'utf-16', 'utf-16-le', 'utf-16-be',
2230 'utf-32', 'utf-32-le', 'utf-32-be',
2231 )
2232 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002233 decoder = enc and codecs.getincrementaldecoder(enc)()
2234 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2235 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002236 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2238 self.check_newline_decoding_utf8(decoder)
2239
2240 def test_newline_bytes(self):
2241 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2242 def _check(dec):
2243 self.assertEquals(dec.newlines, None)
2244 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2245 self.assertEquals(dec.newlines, None)
2246 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2247 self.assertEquals(dec.newlines, None)
2248 dec = self.IncrementalNewlineDecoder(None, translate=False)
2249 _check(dec)
2250 dec = self.IncrementalNewlineDecoder(None, translate=True)
2251 _check(dec)
2252
2253class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2254 pass
2255
2256class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2257 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002258
Christian Heimes1a6387e2008-03-26 12:49:49 +00002259
2260# XXX Tests for open()
2261
2262class MiscIOTest(unittest.TestCase):
2263
Benjamin Petersonad100c32008-11-20 22:06:22 +00002264 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002266
Antoine Pitrou19690592009-06-12 20:14:08 +00002267 def test___all__(self):
2268 for name in self.io.__all__:
2269 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002270 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002271 if name == "open":
2272 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002273 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002274 self.assertTrue(issubclass(obj, Exception), name)
2275 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002276 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002277
Benjamin Petersonad100c32008-11-20 22:06:22 +00002278 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002279 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002280 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002281 f.close()
2282
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 f = self.open(support.TESTFN, "U")
2284 self.assertEquals(f.name, support.TESTFN)
2285 self.assertEquals(f.buffer.name, support.TESTFN)
2286 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002287 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002288 self.assertEquals(f.buffer.mode, "rb")
2289 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002290 f.close()
2291
Antoine Pitrou19690592009-06-12 20:14:08 +00002292 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002293 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002294 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2295 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002296
Antoine Pitrou19690592009-06-12 20:14:08 +00002297 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002298 self.assertEquals(g.mode, "wb")
2299 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002300 self.assertEquals(g.name, f.fileno())
2301 self.assertEquals(g.raw.name, f.fileno())
2302 f.close()
2303 g.close()
2304
Antoine Pitrou19690592009-06-12 20:14:08 +00002305 def test_io_after_close(self):
2306 for kwargs in [
2307 {"mode": "w"},
2308 {"mode": "wb"},
2309 {"mode": "w", "buffering": 1},
2310 {"mode": "w", "buffering": 2},
2311 {"mode": "wb", "buffering": 0},
2312 {"mode": "r"},
2313 {"mode": "rb"},
2314 {"mode": "r", "buffering": 1},
2315 {"mode": "r", "buffering": 2},
2316 {"mode": "rb", "buffering": 0},
2317 {"mode": "w+"},
2318 {"mode": "w+b"},
2319 {"mode": "w+", "buffering": 1},
2320 {"mode": "w+", "buffering": 2},
2321 {"mode": "w+b", "buffering": 0},
2322 ]:
2323 f = self.open(support.TESTFN, **kwargs)
2324 f.close()
2325 self.assertRaises(ValueError, f.flush)
2326 self.assertRaises(ValueError, f.fileno)
2327 self.assertRaises(ValueError, f.isatty)
2328 self.assertRaises(ValueError, f.__iter__)
2329 if hasattr(f, "peek"):
2330 self.assertRaises(ValueError, f.peek, 1)
2331 self.assertRaises(ValueError, f.read)
2332 if hasattr(f, "read1"):
2333 self.assertRaises(ValueError, f.read1, 1024)
2334 if hasattr(f, "readinto"):
2335 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2336 self.assertRaises(ValueError, f.readline)
2337 self.assertRaises(ValueError, f.readlines)
2338 self.assertRaises(ValueError, f.seek, 0)
2339 self.assertRaises(ValueError, f.tell)
2340 self.assertRaises(ValueError, f.truncate)
2341 self.assertRaises(ValueError, f.write,
2342 b"" if "b" in kwargs['mode'] else "")
2343 self.assertRaises(ValueError, f.writelines, [])
2344 self.assertRaises(ValueError, next, f)
2345
2346 def test_blockingioerror(self):
2347 # Various BlockingIOError issues
2348 self.assertRaises(TypeError, self.BlockingIOError)
2349 self.assertRaises(TypeError, self.BlockingIOError, 1)
2350 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2351 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2352 b = self.BlockingIOError(1, "")
2353 self.assertEqual(b.characters_written, 0)
2354 class C(unicode):
2355 pass
2356 c = C("")
2357 b = self.BlockingIOError(1, c)
2358 c.b = b
2359 b.c = c
2360 wr = weakref.ref(c)
2361 del c, b
2362 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002363 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002364
2365 def test_abcs(self):
2366 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002367 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2368 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2369 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2370 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002371
2372 def _check_abc_inheritance(self, abcmodule):
2373 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002374 self.assertIsInstance(f, abcmodule.IOBase)
2375 self.assertIsInstance(f, abcmodule.RawIOBase)
2376 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2377 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002378 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002379 self.assertIsInstance(f, abcmodule.IOBase)
2380 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2381 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2382 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002383 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002384 self.assertIsInstance(f, abcmodule.IOBase)
2385 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2386 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2387 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002388
2389 def test_abc_inheritance(self):
2390 # Test implementations inherit from their respective ABCs
2391 self._check_abc_inheritance(self)
2392
2393 def test_abc_inheritance_official(self):
2394 # Test implementations inherit from the official ABCs of the
2395 # baseline "io" module.
2396 self._check_abc_inheritance(io)
2397
2398class CMiscIOTest(MiscIOTest):
2399 io = io
2400
2401class PyMiscIOTest(MiscIOTest):
2402 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002403
Christian Heimes1a6387e2008-03-26 12:49:49 +00002404def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002405 tests = (CIOTest, PyIOTest,
2406 CBufferedReaderTest, PyBufferedReaderTest,
2407 CBufferedWriterTest, PyBufferedWriterTest,
2408 CBufferedRWPairTest, PyBufferedRWPairTest,
2409 CBufferedRandomTest, PyBufferedRandomTest,
2410 StatefulIncrementalDecoderTest,
2411 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2412 CTextIOWrapperTest, PyTextIOWrapperTest,
2413 CMiscIOTest, PyMiscIOTest,
2414 )
2415
2416 # Put the namespaces of the IO module we are testing and some useful mock
2417 # classes in the __dict__ of each test.
2418 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2419 MockNonBlockWriterIO)
2420 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2421 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2422 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2423 globs = globals()
2424 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2425 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2426 # Avoid turning open into a bound method.
2427 py_io_ns["open"] = pyio.OpenWrapper
2428 for test in tests:
2429 if test.__name__.startswith("C"):
2430 for name, obj in c_io_ns.items():
2431 setattr(test, name, obj)
2432 elif test.__name__.startswith("Py"):
2433 for name, obj in py_io_ns.items():
2434 setattr(test, name, obj)
2435
2436 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002437
2438if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002439 test_main()