blob: 0404c097ce058ee7a187d3c88628475e13adab8b [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import warnings
33import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000034import abc
Georg Brandla4f46e12010-02-07 17:03:15 +000035from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000036from collections import deque
37from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000038
39import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
42
43__metaclass__ = type
44bytes = support.py3k_bytes
45
46def _default_chunk_size():
47 """Get the default TextIOWrapper chunk size"""
48 with io.open(__file__, "r", encoding="latin1") as f:
49 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000050
51
Antoine Pitrou19690592009-06-12 20:14:08 +000052class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000053
54 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000057 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000058
59 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000060 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000061 try:
62 return self._read_stack.pop(0)
63 except:
64 return b""
65
66 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000067 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000068 return len(b)
69
70 def writable(self):
71 return True
72
73 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
79 def seekable(self):
80 return True
81
82 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000083 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000084
85 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
94 return 0
95 if data is None:
96 del self._read_stack[0]
97 return None
98 n = len(data)
99 if len(data) <= max_len:
100 del self._read_stack[0]
101 buf[:n] = data
102 return n
103 else:
104 buf[:] = data[:max_len]
105 self._read_stack[0] = data[max_len:]
106 return max_len
107
108 def truncate(self, pos=None):
109 return pos
110
111class CMockRawIO(MockRawIO, io.RawIOBase):
112 pass
113
114class PyMockRawIO(MockRawIO, pyio.RawIOBase):
115 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000116
117
Antoine Pitrou19690592009-06-12 20:14:08 +0000118class MisbehavedRawIO(MockRawIO):
119 def write(self, b):
120 return MockRawIO.write(self, b) * 2
121
122 def read(self, n=None):
123 return MockRawIO.read(self, n) * 2
124
125 def seek(self, pos, whence):
126 return -123
127
128 def tell(self):
129 return -456
130
131 def readinto(self, buf):
132 MockRawIO.readinto(self, buf)
133 return len(buf) * 5
134
135class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
136 pass
137
138class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
139 pass
140
141
142class CloseFailureIO(MockRawIO):
143 closed = 0
144
145 def close(self):
146 if not self.closed:
147 self.closed = 1
148 raise IOError
149
150class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
151 pass
152
153class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
154 pass
155
156
157class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000158
159 def __init__(self, data):
160 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000161 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000162
163 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000164 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000165 self.read_history.append(None if res is None else len(res))
166 return res
167
Antoine Pitrou19690592009-06-12 20:14:08 +0000168 def readinto(self, b):
169 res = super(MockFileIO, self).readinto(b)
170 self.read_history.append(res)
171 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000172
Antoine Pitrou19690592009-06-12 20:14:08 +0000173class CMockFileIO(MockFileIO, io.BytesIO):
174 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000175
Antoine Pitrou19690592009-06-12 20:14:08 +0000176class PyMockFileIO(MockFileIO, pyio.BytesIO):
177 pass
178
179
180class MockNonBlockWriterIO:
181
182 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000183 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000184 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000185
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 def pop_written(self):
187 s = b"".join(self._write_stack)
188 self._write_stack[:] = []
189 return s
190
191 def block_on(self, char):
192 """Block when a given char is encountered."""
193 self._blocker_char = char
194
195 def readable(self):
196 return True
197
198 def seekable(self):
199 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000200
201 def writable(self):
202 return True
203
Antoine Pitrou19690592009-06-12 20:14:08 +0000204 def write(self, b):
205 b = bytes(b)
206 n = -1
207 if self._blocker_char:
208 try:
209 n = b.index(self._blocker_char)
210 except ValueError:
211 pass
212 else:
213 self._blocker_char = None
214 self._write_stack.append(b[:n])
215 raise self.BlockingIOError(0, "test blocking", n)
216 self._write_stack.append(b)
217 return len(b)
218
219class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
220 BlockingIOError = io.BlockingIOError
221
222class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
223 BlockingIOError = pyio.BlockingIOError
224
Christian Heimes1a6387e2008-03-26 12:49:49 +0000225
226class IOTest(unittest.TestCase):
227
Antoine Pitrou19690592009-06-12 20:14:08 +0000228 def setUp(self):
229 support.unlink(support.TESTFN)
230
Christian Heimes1a6387e2008-03-26 12:49:49 +0000231 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000232 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000233
234 def write_ops(self, f):
235 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000236 f.truncate(0)
237 self.assertEqual(f.tell(), 5)
238 f.seek(0)
239
240 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000241 self.assertEqual(f.seek(0), 0)
242 self.assertEqual(f.write(b"Hello."), 6)
243 self.assertEqual(f.tell(), 6)
244 self.assertEqual(f.seek(-1, 1), 5)
245 self.assertEqual(f.tell(), 5)
246 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
247 self.assertEqual(f.seek(0), 0)
248 self.assertEqual(f.write(b"h"), 1)
249 self.assertEqual(f.seek(-1, 2), 13)
250 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000251
Christian Heimes1a6387e2008-03-26 12:49:49 +0000252 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000253 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254 self.assertRaises(TypeError, f.seek, 0.0)
255
256 def read_ops(self, f, buffered=False):
257 data = f.read(5)
258 self.assertEqual(data, b"hello")
259 data = bytearray(data)
260 self.assertEqual(f.readinto(data), 5)
261 self.assertEqual(data, b" worl")
262 self.assertEqual(f.readinto(data), 2)
263 self.assertEqual(len(data), 5)
264 self.assertEqual(data[:2], b"d\n")
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.read(20), b"hello world\n")
267 self.assertEqual(f.read(1), b"")
268 self.assertEqual(f.readinto(bytearray(b"x")), 0)
269 self.assertEqual(f.seek(-6, 2), 6)
270 self.assertEqual(f.read(5), b"world")
271 self.assertEqual(f.read(0), b"")
272 self.assertEqual(f.readinto(bytearray()), 0)
273 self.assertEqual(f.seek(-6, 1), 5)
274 self.assertEqual(f.read(5), b" worl")
275 self.assertEqual(f.tell(), 10)
276 self.assertRaises(TypeError, f.seek, 0.0)
277 if buffered:
278 f.seek(0)
279 self.assertEqual(f.read(), b"hello world\n")
280 f.seek(6)
281 self.assertEqual(f.read(), b"world\n")
282 self.assertEqual(f.read(), b"")
283
284 LARGE = 2**31
285
286 def large_file_ops(self, f):
287 assert f.readable()
288 assert f.writable()
289 self.assertEqual(f.seek(self.LARGE), self.LARGE)
290 self.assertEqual(f.tell(), self.LARGE)
291 self.assertEqual(f.write(b"xxx"), 3)
292 self.assertEqual(f.tell(), self.LARGE + 3)
293 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
294 self.assertEqual(f.truncate(), self.LARGE + 2)
295 self.assertEqual(f.tell(), self.LARGE + 2)
296 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
297 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000298 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000299 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
300 self.assertEqual(f.seek(-1, 2), self.LARGE)
301 self.assertEqual(f.read(2), b"x")
302
Antoine Pitrou19690592009-06-12 20:14:08 +0000303 def test_invalid_operations(self):
304 # Try writing on a file opened in read mode and vice-versa.
305 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000306 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000307 self.assertRaises(IOError, fp.read)
308 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000309 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000310 self.assertRaises(IOError, fp.write, b"blah")
311 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000312 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000313 self.assertRaises(IOError, fp.write, "blah")
314 self.assertRaises(IOError, fp.writelines, ["blah\n"])
315
Christian Heimes1a6387e2008-03-26 12:49:49 +0000316 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000317 with self.open(support.TESTFN, "wb", buffering=0) as f:
318 self.assertEqual(f.readable(), False)
319 self.assertEqual(f.writable(), True)
320 self.assertEqual(f.seekable(), True)
321 self.write_ops(f)
322 with self.open(support.TESTFN, "rb", buffering=0) as f:
323 self.assertEqual(f.readable(), True)
324 self.assertEqual(f.writable(), False)
325 self.assertEqual(f.seekable(), True)
326 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000327
328 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000329 with self.open(support.TESTFN, "wb") as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb") as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000339
340 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000341 with self.open(support.TESTFN, "wb") as f:
342 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
343 with self.open(support.TESTFN, "rb") as f:
344 self.assertEqual(f.readline(), b"abc\n")
345 self.assertEqual(f.readline(10), b"def\n")
346 self.assertEqual(f.readline(2), b"xy")
347 self.assertEqual(f.readline(4), b"zzy\n")
348 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000349 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000350 self.assertRaises(TypeError, f.readline, 5.3)
351 with self.open(support.TESTFN, "r") as f:
352 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000353
354 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000355 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000356 self.write_ops(f)
357 data = f.getvalue()
358 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000359 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000360 self.read_ops(f, True)
361
362 def test_large_file_ops(self):
363 # On Windows and Mac OSX this test comsumes large resources; It takes
364 # a long time to build the >2GB file and takes >2GB of disk space
365 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000366 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
367 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000368 print("\nTesting large file ops skipped on %s." % sys.platform,
369 file=sys.stderr)
370 print("It requires %d bytes and a long time." % self.LARGE,
371 file=sys.stderr)
372 print("Use 'regrtest.py -u largefile test_io' to run it.",
373 file=sys.stderr)
374 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000375 with self.open(support.TESTFN, "w+b", 0) as f:
376 self.large_file_ops(f)
377 with self.open(support.TESTFN, "w+b") as f:
378 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000379
380 def test_with_open(self):
381 for bufsize in (0, 1, 100):
382 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000384 f.write(b"xxx")
385 self.assertEqual(f.closed, True)
386 f = None
387 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000388 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000389 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000390 except ZeroDivisionError:
391 self.assertEqual(f.closed, True)
392 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000393 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000394
Antoine Pitroue741cc62009-01-21 00:45:36 +0000395 # issue 5008
396 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000397 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000398 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000399 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000401 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000402 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000404 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000405
Christian Heimes1a6387e2008-03-26 12:49:49 +0000406 def test_destructor(self):
407 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000408 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000409 def __del__(self):
410 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000411 try:
412 f = super(MyFileIO, self).__del__
413 except AttributeError:
414 pass
415 else:
416 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000417 def close(self):
418 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000419 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000420 def flush(self):
421 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000422 super(MyFileIO, self).flush()
423 f = MyFileIO(support.TESTFN, "wb")
424 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000425 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000426 support.gc_collect()
427 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000428 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000429 self.assertEqual(f.read(), b"xxx")
430
431 def _check_base_destructor(self, base):
432 record = []
433 class MyIO(base):
434 def __init__(self):
435 # This exercises the availability of attributes on object
436 # destruction.
437 # (in the C version, close() is called by the tp_dealloc
438 # function, not by __del__)
439 self.on_del = 1
440 self.on_close = 2
441 self.on_flush = 3
442 def __del__(self):
443 record.append(self.on_del)
444 try:
445 f = super(MyIO, self).__del__
446 except AttributeError:
447 pass
448 else:
449 f()
450 def close(self):
451 record.append(self.on_close)
452 super(MyIO, self).close()
453 def flush(self):
454 record.append(self.on_flush)
455 super(MyIO, self).flush()
456 f = MyIO()
457 del f
458 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000459 self.assertEqual(record, [1, 2, 3])
460
Antoine Pitrou19690592009-06-12 20:14:08 +0000461 def test_IOBase_destructor(self):
462 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000463
Antoine Pitrou19690592009-06-12 20:14:08 +0000464 def test_RawIOBase_destructor(self):
465 self._check_base_destructor(self.RawIOBase)
466
467 def test_BufferedIOBase_destructor(self):
468 self._check_base_destructor(self.BufferedIOBase)
469
470 def test_TextIOBase_destructor(self):
471 self._check_base_destructor(self.TextIOBase)
472
473 def test_close_flushes(self):
474 with self.open(support.TESTFN, "wb") as f:
475 f.write(b"xxx")
476 with self.open(support.TESTFN, "rb") as f:
477 self.assertEqual(f.read(), b"xxx")
478
479 def test_array_writes(self):
480 a = array.array(b'i', range(10))
481 n = len(a.tostring())
482 with self.open(support.TESTFN, "wb", 0) as f:
483 self.assertEqual(f.write(a), n)
484 with self.open(support.TESTFN, "wb") as f:
485 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000486
487 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000488 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000489 closefd=False)
490
Antoine Pitrou19690592009-06-12 20:14:08 +0000491 def test_read_closed(self):
492 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000493 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000494 with self.open(support.TESTFN, "r") as f:
495 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000496 self.assertEqual(file.read(), "egg\n")
497 file.seek(0)
498 file.close()
499 self.assertRaises(ValueError, file.read)
500
501 def test_no_closefd_with_filename(self):
502 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000503 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000504
505 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000506 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000507 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000509 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000510 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 self.assertEqual(file.buffer.raw.closefd, False)
512
Antoine Pitrou19690592009-06-12 20:14:08 +0000513 def test_garbage_collection(self):
514 # FileIO objects are collected, and collecting them flushes
515 # all data to disk.
516 f = self.FileIO(support.TESTFN, "wb")
517 f.write(b"abcxxx")
518 f.f = f
519 wr = weakref.ref(f)
520 del f
521 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000522 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000523 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000524 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000525
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 def test_unbounded_file(self):
527 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
528 zero = "/dev/zero"
529 if not os.path.exists(zero):
530 self.skipTest("{0} does not exist".format(zero))
531 if sys.maxsize > 0x7FFFFFFF:
532 self.skipTest("test can only run in a 32-bit address space")
533 if support.real_max_memuse < support._2G:
534 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000535 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000536 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000537 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000539 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000541
Antoine Pitrou19690592009-06-12 20:14:08 +0000542class CIOTest(IOTest):
543 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000544
Antoine Pitrou19690592009-06-12 20:14:08 +0000545class PyIOTest(IOTest):
546 test_array_writes = unittest.skip(
547 "len(array.array) returns number of elements rather than bytelength"
548 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000549
550
Antoine Pitrou19690592009-06-12 20:14:08 +0000551class CommonBufferedTests:
552 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
553
554 def test_detach(self):
555 raw = self.MockRawIO()
556 buf = self.tp(raw)
557 self.assertIs(buf.detach(), raw)
558 self.assertRaises(ValueError, buf.detach)
559
560 def test_fileno(self):
561 rawio = self.MockRawIO()
562 bufio = self.tp(rawio)
563
564 self.assertEquals(42, bufio.fileno())
565
566 def test_no_fileno(self):
567 # XXX will we always have fileno() function? If so, kill
568 # this test. Else, write it.
569 pass
570
571 def test_invalid_args(self):
572 rawio = self.MockRawIO()
573 bufio = self.tp(rawio)
574 # Invalid whence
575 self.assertRaises(ValueError, bufio.seek, 0, -1)
576 self.assertRaises(ValueError, bufio.seek, 0, 3)
577
578 def test_override_destructor(self):
579 tp = self.tp
580 record = []
581 class MyBufferedIO(tp):
582 def __del__(self):
583 record.append(1)
584 try:
585 f = super(MyBufferedIO, self).__del__
586 except AttributeError:
587 pass
588 else:
589 f()
590 def close(self):
591 record.append(2)
592 super(MyBufferedIO, self).close()
593 def flush(self):
594 record.append(3)
595 super(MyBufferedIO, self).flush()
596 rawio = self.MockRawIO()
597 bufio = MyBufferedIO(rawio)
598 writable = bufio.writable()
599 del bufio
600 support.gc_collect()
601 if writable:
602 self.assertEqual(record, [1, 2, 3])
603 else:
604 self.assertEqual(record, [1, 2])
605
606 def test_context_manager(self):
607 # Test usability as a context manager
608 rawio = self.MockRawIO()
609 bufio = self.tp(rawio)
610 def _with():
611 with bufio:
612 pass
613 _with()
614 # bufio should now be closed, and using it a second time should raise
615 # a ValueError.
616 self.assertRaises(ValueError, _with)
617
618 def test_error_through_destructor(self):
619 # Test that the exception state is not modified by a destructor,
620 # even if close() fails.
621 rawio = self.CloseFailureIO()
622 def f():
623 self.tp(rawio).xyzzy
624 with support.captured_output("stderr") as s:
625 self.assertRaises(AttributeError, f)
626 s = s.getvalue().strip()
627 if s:
628 # The destructor *may* have printed an unraisable error, check it
629 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000630 self.assertTrue(s.startswith("Exception IOError: "), s)
631 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000632
633 def test_repr(self):
634 raw = self.MockRawIO()
635 b = self.tp(raw)
636 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
637 self.assertEqual(repr(b), "<%s>" % clsname)
638 raw.name = "dummy"
639 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
640 raw.name = b"dummy"
641 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000642
643
Antoine Pitrou19690592009-06-12 20:14:08 +0000644class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
645 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000646
Antoine Pitrou19690592009-06-12 20:14:08 +0000647 def test_constructor(self):
648 rawio = self.MockRawIO([b"abc"])
649 bufio = self.tp(rawio)
650 bufio.__init__(rawio)
651 bufio.__init__(rawio, buffer_size=1024)
652 bufio.__init__(rawio, buffer_size=16)
653 self.assertEquals(b"abc", bufio.read())
654 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
655 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
656 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
657 rawio = self.MockRawIO([b"abc"])
658 bufio.__init__(rawio)
659 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000660
Antoine Pitrou19690592009-06-12 20:14:08 +0000661 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000662 for arg in (None, 7):
663 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
664 bufio = self.tp(rawio)
665 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000666 # Invalid args
667 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000668
Antoine Pitrou19690592009-06-12 20:14:08 +0000669 def test_read1(self):
670 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
671 bufio = self.tp(rawio)
672 self.assertEquals(b"a", bufio.read(1))
673 self.assertEquals(b"b", bufio.read1(1))
674 self.assertEquals(rawio._reads, 1)
675 self.assertEquals(b"c", bufio.read1(100))
676 self.assertEquals(rawio._reads, 1)
677 self.assertEquals(b"d", bufio.read1(100))
678 self.assertEquals(rawio._reads, 2)
679 self.assertEquals(b"efg", bufio.read1(100))
680 self.assertEquals(rawio._reads, 3)
681 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000682 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000683 # Invalid args
684 self.assertRaises(ValueError, bufio.read1, -1)
685
686 def test_readinto(self):
687 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
688 bufio = self.tp(rawio)
689 b = bytearray(2)
690 self.assertEquals(bufio.readinto(b), 2)
691 self.assertEquals(b, b"ab")
692 self.assertEquals(bufio.readinto(b), 2)
693 self.assertEquals(b, b"cd")
694 self.assertEquals(bufio.readinto(b), 2)
695 self.assertEquals(b, b"ef")
696 self.assertEquals(bufio.readinto(b), 1)
697 self.assertEquals(b, b"gf")
698 self.assertEquals(bufio.readinto(b), 0)
699 self.assertEquals(b, b"gf")
700
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000701 def test_readlines(self):
702 def bufio():
703 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
704 return self.tp(rawio)
705 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
706 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
707 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
708
Antoine Pitrou19690592009-06-12 20:14:08 +0000709 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000710 data = b"abcdefghi"
711 dlen = len(data)
712
713 tests = [
714 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
715 [ 100, [ 3, 3, 3], [ dlen ] ],
716 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
717 ]
718
719 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000720 rawio = self.MockFileIO(data)
721 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000722 pos = 0
723 for nbytes in buf_read_sizes:
724 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
725 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000726 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000727 self.assertEquals(rawio.read_history, raw_read_sizes)
728
Antoine Pitrou19690592009-06-12 20:14:08 +0000729 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000730 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000731 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
732 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000733
734 self.assertEquals(b"abcd", bufio.read(6))
735 self.assertEquals(b"e", bufio.read(1))
736 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000737 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000738 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000739 self.assertEquals(b"", bufio.read())
740
Antoine Pitrou19690592009-06-12 20:14:08 +0000741 def test_read_past_eof(self):
742 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
743 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000744
745 self.assertEquals(b"abcdefg", bufio.read(9000))
746
Antoine Pitrou19690592009-06-12 20:14:08 +0000747 def test_read_all(self):
748 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
749 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000750
751 self.assertEquals(b"abcdefg", bufio.read())
752
Antoine Pitrou19690592009-06-12 20:14:08 +0000753 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000754 try:
755 # Write out many bytes with exactly the same number of 0's,
756 # 1's... 255's. This will help us check that concurrent reading
757 # doesn't duplicate or forget contents.
758 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000759 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000760 random.shuffle(l)
761 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000762 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000763 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000764 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000765 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000766 errors = []
767 results = []
768 def f():
769 try:
770 # Intra-buffer read then buffer-flushing read
771 for n in cycle([1, 19]):
772 s = bufio.read(n)
773 if not s:
774 break
775 # list.append() is atomic
776 results.append(s)
777 except Exception as e:
778 errors.append(e)
779 raise
780 threads = [threading.Thread(target=f) for x in range(20)]
781 for t in threads:
782 t.start()
783 time.sleep(0.02) # yield
784 for t in threads:
785 t.join()
786 self.assertFalse(errors,
787 "the following exceptions were caught: %r" % errors)
788 s = b''.join(results)
789 for i in range(256):
790 c = bytes(bytearray([i]))
791 self.assertEqual(s.count(c), N)
792 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000793 support.unlink(support.TESTFN)
794
795 def test_misbehaved_io(self):
796 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
797 bufio = self.tp(rawio)
798 self.assertRaises(IOError, bufio.seek, 0)
799 self.assertRaises(IOError, bufio.tell)
800
801class CBufferedReaderTest(BufferedReaderTest):
802 tp = io.BufferedReader
803
804 def test_constructor(self):
805 BufferedReaderTest.test_constructor(self)
806 # The allocation can succeed on 32-bit builds, e.g. with more
807 # than 2GB RAM and a 64-bit kernel.
808 if sys.maxsize > 0x7FFFFFFF:
809 rawio = self.MockRawIO()
810 bufio = self.tp(rawio)
811 self.assertRaises((OverflowError, MemoryError, ValueError),
812 bufio.__init__, rawio, sys.maxsize)
813
814 def test_initialization(self):
815 rawio = self.MockRawIO([b"abc"])
816 bufio = self.tp(rawio)
817 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
818 self.assertRaises(ValueError, bufio.read)
819 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
820 self.assertRaises(ValueError, bufio.read)
821 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
822 self.assertRaises(ValueError, bufio.read)
823
824 def test_misbehaved_io_read(self):
825 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
827 # _pyio.BufferedReader seems to implement reading different, so that
828 # checking this is not so easy.
829 self.assertRaises(IOError, bufio.read, 10)
830
831 def test_garbage_collection(self):
832 # C BufferedReader objects are collected.
833 # The Python version has __del__, so it ends into gc.garbage instead
834 rawio = self.FileIO(support.TESTFN, "w+b")
835 f = self.tp(rawio)
836 f.f = f
837 wr = weakref.ref(f)
838 del f
839 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000840 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000841
842class PyBufferedReaderTest(BufferedReaderTest):
843 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000844
845
Antoine Pitrou19690592009-06-12 20:14:08 +0000846class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
847 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000848
Antoine Pitrou19690592009-06-12 20:14:08 +0000849 def test_constructor(self):
850 rawio = self.MockRawIO()
851 bufio = self.tp(rawio)
852 bufio.__init__(rawio)
853 bufio.__init__(rawio, buffer_size=1024)
854 bufio.__init__(rawio, buffer_size=16)
855 self.assertEquals(3, bufio.write(b"abc"))
856 bufio.flush()
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
858 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
859 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
860 bufio.__init__(rawio)
861 self.assertEquals(3, bufio.write(b"ghi"))
862 bufio.flush()
863 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000864
Antoine Pitrou19690592009-06-12 20:14:08 +0000865 def test_detach_flush(self):
866 raw = self.MockRawIO()
867 buf = self.tp(raw)
868 buf.write(b"howdy!")
869 self.assertFalse(raw._write_stack)
870 buf.detach()
871 self.assertEqual(raw._write_stack, [b"howdy!"])
872
873 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000874 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000875 writer = self.MockRawIO()
876 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000877 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000878 self.assertFalse(writer._write_stack)
879
Antoine Pitrou19690592009-06-12 20:14:08 +0000880 def test_write_overflow(self):
881 writer = self.MockRawIO()
882 bufio = self.tp(writer, 8)
883 contents = b"abcdefghijklmnop"
884 for n in range(0, len(contents), 3):
885 bufio.write(contents[n:n+3])
886 flushed = b"".join(writer._write_stack)
887 # At least (total - 8) bytes were implicitly flushed, perhaps more
888 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000889 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000890
Antoine Pitrou19690592009-06-12 20:14:08 +0000891 def check_writes(self, intermediate_func):
892 # Lots of writes, test the flushed output is as expected.
893 contents = bytes(range(256)) * 1000
894 n = 0
895 writer = self.MockRawIO()
896 bufio = self.tp(writer, 13)
897 # Generator of write sizes: repeat each N 15 times then proceed to N+1
898 def gen_sizes():
899 for size in count(1):
900 for i in range(15):
901 yield size
902 sizes = gen_sizes()
903 while n < len(contents):
904 size = min(next(sizes), len(contents) - n)
905 self.assertEquals(bufio.write(contents[n:n+size]), size)
906 intermediate_func(bufio)
907 n += size
908 bufio.flush()
909 self.assertEquals(contents,
910 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000911
Antoine Pitrou19690592009-06-12 20:14:08 +0000912 def test_writes(self):
913 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000914
Antoine Pitrou19690592009-06-12 20:14:08 +0000915 def test_writes_and_flushes(self):
916 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917
Antoine Pitrou19690592009-06-12 20:14:08 +0000918 def test_writes_and_seeks(self):
919 def _seekabs(bufio):
920 pos = bufio.tell()
921 bufio.seek(pos + 1, 0)
922 bufio.seek(pos - 1, 0)
923 bufio.seek(pos, 0)
924 self.check_writes(_seekabs)
925 def _seekrel(bufio):
926 pos = bufio.seek(0, 1)
927 bufio.seek(+1, 1)
928 bufio.seek(-1, 1)
929 bufio.seek(pos, 0)
930 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000931
Antoine Pitrou19690592009-06-12 20:14:08 +0000932 def test_writes_and_truncates(self):
933 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000934
Antoine Pitrou19690592009-06-12 20:14:08 +0000935 def test_write_non_blocking(self):
936 raw = self.MockNonBlockWriterIO()
937 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000938
Antoine Pitrou19690592009-06-12 20:14:08 +0000939 self.assertEquals(bufio.write(b"abcd"), 4)
940 self.assertEquals(bufio.write(b"efghi"), 5)
941 # 1 byte will be written, the rest will be buffered
942 raw.block_on(b"k")
943 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000944
Antoine Pitrou19690592009-06-12 20:14:08 +0000945 # 8 bytes will be written, 8 will be buffered and the rest will be lost
946 raw.block_on(b"0")
947 try:
948 bufio.write(b"opqrwxyz0123456789")
949 except self.BlockingIOError as e:
950 written = e.characters_written
951 else:
952 self.fail("BlockingIOError should have been raised")
953 self.assertEquals(written, 16)
954 self.assertEquals(raw.pop_written(),
955 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000956
Antoine Pitrou19690592009-06-12 20:14:08 +0000957 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
958 s = raw.pop_written()
959 # Previously buffered bytes were flushed
960 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000961
Antoine Pitrou19690592009-06-12 20:14:08 +0000962 def test_write_and_rewind(self):
963 raw = io.BytesIO()
964 bufio = self.tp(raw, 4)
965 self.assertEqual(bufio.write(b"abcdef"), 6)
966 self.assertEqual(bufio.tell(), 6)
967 bufio.seek(0, 0)
968 self.assertEqual(bufio.write(b"XY"), 2)
969 bufio.seek(6, 0)
970 self.assertEqual(raw.getvalue(), b"XYcdef")
971 self.assertEqual(bufio.write(b"123456"), 6)
972 bufio.flush()
973 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000974
Antoine Pitrou19690592009-06-12 20:14:08 +0000975 def test_flush(self):
976 writer = self.MockRawIO()
977 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000978 bufio.write(b"abc")
979 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000980 self.assertEquals(b"abc", writer._write_stack[0])
981
Antoine Pitrou19690592009-06-12 20:14:08 +0000982 def test_destructor(self):
983 writer = self.MockRawIO()
984 bufio = self.tp(writer, 8)
985 bufio.write(b"abc")
986 del bufio
987 support.gc_collect()
988 self.assertEquals(b"abc", writer._write_stack[0])
989
990 def test_truncate(self):
991 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000992 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000993 bufio = self.tp(raw, 8)
994 bufio.write(b"abcdef")
995 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000996 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000997 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000998 self.assertEqual(f.read(), b"abc")
999
1000 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001001 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001002 # Write out many bytes from many threads and test they were
1003 # all flushed.
1004 N = 1000
1005 contents = bytes(range(256)) * N
1006 sizes = cycle([1, 19])
1007 n = 0
1008 queue = deque()
1009 while n < len(contents):
1010 size = next(sizes)
1011 queue.append(contents[n:n+size])
1012 n += size
1013 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001014 # We use a real file object because it allows us to
1015 # exercise situations where the GIL is released before
1016 # writing the buffer to the raw streams. This is in addition
1017 # to concurrency issues due to switching threads in the middle
1018 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001019 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001020 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001021 errors = []
1022 def f():
1023 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001024 while True:
1025 try:
1026 s = queue.popleft()
1027 except IndexError:
1028 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001029 bufio.write(s)
1030 except Exception as e:
1031 errors.append(e)
1032 raise
1033 threads = [threading.Thread(target=f) for x in range(20)]
1034 for t in threads:
1035 t.start()
1036 time.sleep(0.02) # yield
1037 for t in threads:
1038 t.join()
1039 self.assertFalse(errors,
1040 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001041 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001042 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001043 s = f.read()
1044 for i in range(256):
1045 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001046 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001047 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001048
Antoine Pitrou19690592009-06-12 20:14:08 +00001049 def test_misbehaved_io(self):
1050 rawio = self.MisbehavedRawIO()
1051 bufio = self.tp(rawio, 5)
1052 self.assertRaises(IOError, bufio.seek, 0)
1053 self.assertRaises(IOError, bufio.tell)
1054 self.assertRaises(IOError, bufio.write, b"abcdef")
1055
1056 def test_max_buffer_size_deprecation(self):
1057 with support.check_warnings() as w:
1058 warnings.simplefilter("always", DeprecationWarning)
1059 self.tp(self.MockRawIO(), 8, 12)
1060 self.assertEqual(len(w.warnings), 1)
1061 warning = w.warnings[0]
1062 self.assertTrue(warning.category is DeprecationWarning)
1063 self.assertEqual(str(warning.message),
1064 "max_buffer_size is deprecated")
1065
1066
1067class CBufferedWriterTest(BufferedWriterTest):
1068 tp = io.BufferedWriter
1069
1070 def test_constructor(self):
1071 BufferedWriterTest.test_constructor(self)
1072 # The allocation can succeed on 32-bit builds, e.g. with more
1073 # than 2GB RAM and a 64-bit kernel.
1074 if sys.maxsize > 0x7FFFFFFF:
1075 rawio = self.MockRawIO()
1076 bufio = self.tp(rawio)
1077 self.assertRaises((OverflowError, MemoryError, ValueError),
1078 bufio.__init__, rawio, sys.maxsize)
1079
1080 def test_initialization(self):
1081 rawio = self.MockRawIO()
1082 bufio = self.tp(rawio)
1083 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1084 self.assertRaises(ValueError, bufio.write, b"def")
1085 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1086 self.assertRaises(ValueError, bufio.write, b"def")
1087 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1088 self.assertRaises(ValueError, bufio.write, b"def")
1089
1090 def test_garbage_collection(self):
1091 # C BufferedWriter objects are collected, and collecting them flushes
1092 # all data to disk.
1093 # The Python version has __del__, so it ends into gc.garbage instead
1094 rawio = self.FileIO(support.TESTFN, "w+b")
1095 f = self.tp(rawio)
1096 f.write(b"123xxx")
1097 f.x = f
1098 wr = weakref.ref(f)
1099 del f
1100 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001101 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001102 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001103 self.assertEqual(f.read(), b"123xxx")
1104
1105
1106class PyBufferedWriterTest(BufferedWriterTest):
1107 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001108
1109class BufferedRWPairTest(unittest.TestCase):
1110
Antoine Pitrou19690592009-06-12 20:14:08 +00001111 def test_constructor(self):
1112 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001113 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001114
Antoine Pitrou19690592009-06-12 20:14:08 +00001115 def test_detach(self):
1116 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1117 self.assertRaises(self.UnsupportedOperation, pair.detach)
1118
1119 def test_constructor_max_buffer_size_deprecation(self):
1120 with support.check_warnings() as w:
1121 warnings.simplefilter("always", DeprecationWarning)
1122 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1123 self.assertEqual(len(w.warnings), 1)
1124 warning = w.warnings[0]
1125 self.assertTrue(warning.category is DeprecationWarning)
1126 self.assertEqual(str(warning.message),
1127 "max_buffer_size is deprecated")
1128
1129 def test_constructor_with_not_readable(self):
1130 class NotReadable(MockRawIO):
1131 def readable(self):
1132 return False
1133
1134 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1135
1136 def test_constructor_with_not_writeable(self):
1137 class NotWriteable(MockRawIO):
1138 def writable(self):
1139 return False
1140
1141 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1142
1143 def test_read(self):
1144 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1145
1146 self.assertEqual(pair.read(3), b"abc")
1147 self.assertEqual(pair.read(1), b"d")
1148 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001149 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1150 self.assertEqual(pair.read(None), b"abc")
1151
1152 def test_readlines(self):
1153 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1154 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1155 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1156 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001157
1158 def test_read1(self):
1159 # .read1() is delegated to the underlying reader object, so this test
1160 # can be shallow.
1161 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1162
1163 self.assertEqual(pair.read1(3), b"abc")
1164
1165 def test_readinto(self):
1166 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1167
1168 data = bytearray(5)
1169 self.assertEqual(pair.readinto(data), 5)
1170 self.assertEqual(data, b"abcde")
1171
1172 def test_write(self):
1173 w = self.MockRawIO()
1174 pair = self.tp(self.MockRawIO(), w)
1175
1176 pair.write(b"abc")
1177 pair.flush()
1178 pair.write(b"def")
1179 pair.flush()
1180 self.assertEqual(w._write_stack, [b"abc", b"def"])
1181
1182 def test_peek(self):
1183 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1184
1185 self.assertTrue(pair.peek(3).startswith(b"abc"))
1186 self.assertEqual(pair.read(3), b"abc")
1187
1188 def test_readable(self):
1189 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1190 self.assertTrue(pair.readable())
1191
1192 def test_writeable(self):
1193 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1194 self.assertTrue(pair.writable())
1195
1196 def test_seekable(self):
1197 # BufferedRWPairs are never seekable, even if their readers and writers
1198 # are.
1199 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1200 self.assertFalse(pair.seekable())
1201
1202 # .flush() is delegated to the underlying writer object and has been
1203 # tested in the test_write method.
1204
1205 def test_close_and_closed(self):
1206 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1207 self.assertFalse(pair.closed)
1208 pair.close()
1209 self.assertTrue(pair.closed)
1210
1211 def test_isatty(self):
1212 class SelectableIsAtty(MockRawIO):
1213 def __init__(self, isatty):
1214 MockRawIO.__init__(self)
1215 self._isatty = isatty
1216
1217 def isatty(self):
1218 return self._isatty
1219
1220 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1221 self.assertFalse(pair.isatty())
1222
1223 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1224 self.assertTrue(pair.isatty())
1225
1226 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1227 self.assertTrue(pair.isatty())
1228
1229 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1230 self.assertTrue(pair.isatty())
1231
1232class CBufferedRWPairTest(BufferedRWPairTest):
1233 tp = io.BufferedRWPair
1234
1235class PyBufferedRWPairTest(BufferedRWPairTest):
1236 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001237
1238
Antoine Pitrou19690592009-06-12 20:14:08 +00001239class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1240 read_mode = "rb+"
1241 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001242
Antoine Pitrou19690592009-06-12 20:14:08 +00001243 def test_constructor(self):
1244 BufferedReaderTest.test_constructor(self)
1245 BufferedWriterTest.test_constructor(self)
1246
1247 def test_read_and_write(self):
1248 raw = self.MockRawIO((b"asdf", b"ghjk"))
1249 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001250
1251 self.assertEqual(b"as", rw.read(2))
1252 rw.write(b"ddd")
1253 rw.write(b"eee")
1254 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001255 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001256 self.assertEquals(b"dddeee", raw._write_stack[0])
1257
Antoine Pitrou19690592009-06-12 20:14:08 +00001258 def test_seek_and_tell(self):
1259 raw = self.BytesIO(b"asdfghjkl")
1260 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001261
1262 self.assertEquals(b"as", rw.read(2))
1263 self.assertEquals(2, rw.tell())
1264 rw.seek(0, 0)
1265 self.assertEquals(b"asdf", rw.read(4))
1266
1267 rw.write(b"asdf")
1268 rw.seek(0, 0)
1269 self.assertEquals(b"asdfasdfl", rw.read())
1270 self.assertEquals(9, rw.tell())
1271 rw.seek(-4, 2)
1272 self.assertEquals(5, rw.tell())
1273 rw.seek(2, 1)
1274 self.assertEquals(7, rw.tell())
1275 self.assertEquals(b"fl", rw.read(11))
1276 self.assertRaises(TypeError, rw.seek, 0.0)
1277
Antoine Pitrou19690592009-06-12 20:14:08 +00001278 def check_flush_and_read(self, read_func):
1279 raw = self.BytesIO(b"abcdefghi")
1280 bufio = self.tp(raw)
1281
1282 self.assertEquals(b"ab", read_func(bufio, 2))
1283 bufio.write(b"12")
1284 self.assertEquals(b"ef", read_func(bufio, 2))
1285 self.assertEquals(6, bufio.tell())
1286 bufio.flush()
1287 self.assertEquals(6, bufio.tell())
1288 self.assertEquals(b"ghi", read_func(bufio))
1289 raw.seek(0, 0)
1290 raw.write(b"XYZ")
1291 # flush() resets the read buffer
1292 bufio.flush()
1293 bufio.seek(0, 0)
1294 self.assertEquals(b"XYZ", read_func(bufio, 3))
1295
1296 def test_flush_and_read(self):
1297 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1298
1299 def test_flush_and_readinto(self):
1300 def _readinto(bufio, n=-1):
1301 b = bytearray(n if n >= 0 else 9999)
1302 n = bufio.readinto(b)
1303 return bytes(b[:n])
1304 self.check_flush_and_read(_readinto)
1305
1306 def test_flush_and_peek(self):
1307 def _peek(bufio, n=-1):
1308 # This relies on the fact that the buffer can contain the whole
1309 # raw stream, otherwise peek() can return less.
1310 b = bufio.peek(n)
1311 if n != -1:
1312 b = b[:n]
1313 bufio.seek(len(b), 1)
1314 return b
1315 self.check_flush_and_read(_peek)
1316
1317 def test_flush_and_write(self):
1318 raw = self.BytesIO(b"abcdefghi")
1319 bufio = self.tp(raw)
1320
1321 bufio.write(b"123")
1322 bufio.flush()
1323 bufio.write(b"45")
1324 bufio.flush()
1325 bufio.seek(0, 0)
1326 self.assertEquals(b"12345fghi", raw.getvalue())
1327 self.assertEquals(b"12345fghi", bufio.read())
1328
1329 def test_threads(self):
1330 BufferedReaderTest.test_threads(self)
1331 BufferedWriterTest.test_threads(self)
1332
1333 def test_writes_and_peek(self):
1334 def _peek(bufio):
1335 bufio.peek(1)
1336 self.check_writes(_peek)
1337 def _peek(bufio):
1338 pos = bufio.tell()
1339 bufio.seek(-1, 1)
1340 bufio.peek(1)
1341 bufio.seek(pos, 0)
1342 self.check_writes(_peek)
1343
1344 def test_writes_and_reads(self):
1345 def _read(bufio):
1346 bufio.seek(-1, 1)
1347 bufio.read(1)
1348 self.check_writes(_read)
1349
1350 def test_writes_and_read1s(self):
1351 def _read1(bufio):
1352 bufio.seek(-1, 1)
1353 bufio.read1(1)
1354 self.check_writes(_read1)
1355
1356 def test_writes_and_readintos(self):
1357 def _read(bufio):
1358 bufio.seek(-1, 1)
1359 bufio.readinto(bytearray(1))
1360 self.check_writes(_read)
1361
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001362 def test_write_after_readahead(self):
1363 # Issue #6629: writing after the buffer was filled by readahead should
1364 # first rewind the raw stream.
1365 for overwrite_size in [1, 5]:
1366 raw = self.BytesIO(b"A" * 10)
1367 bufio = self.tp(raw, 4)
1368 # Trigger readahead
1369 self.assertEqual(bufio.read(1), b"A")
1370 self.assertEqual(bufio.tell(), 1)
1371 # Overwriting should rewind the raw stream if it needs so
1372 bufio.write(b"B" * overwrite_size)
1373 self.assertEqual(bufio.tell(), overwrite_size + 1)
1374 # If the write size was smaller than the buffer size, flush() and
1375 # check that rewind happens.
1376 bufio.flush()
1377 self.assertEqual(bufio.tell(), overwrite_size + 1)
1378 s = raw.getvalue()
1379 self.assertEqual(s,
1380 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1381
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001382 def test_truncate_after_read_or_write(self):
1383 raw = self.BytesIO(b"A" * 10)
1384 bufio = self.tp(raw, 100)
1385 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1386 self.assertEqual(bufio.truncate(), 2)
1387 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1388 self.assertEqual(bufio.truncate(), 4)
1389
Antoine Pitrou19690592009-06-12 20:14:08 +00001390 def test_misbehaved_io(self):
1391 BufferedReaderTest.test_misbehaved_io(self)
1392 BufferedWriterTest.test_misbehaved_io(self)
1393
1394class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1395 tp = io.BufferedRandom
1396
1397 def test_constructor(self):
1398 BufferedRandomTest.test_constructor(self)
1399 # The allocation can succeed on 32-bit builds, e.g. with more
1400 # than 2GB RAM and a 64-bit kernel.
1401 if sys.maxsize > 0x7FFFFFFF:
1402 rawio = self.MockRawIO()
1403 bufio = self.tp(rawio)
1404 self.assertRaises((OverflowError, MemoryError, ValueError),
1405 bufio.__init__, rawio, sys.maxsize)
1406
1407 def test_garbage_collection(self):
1408 CBufferedReaderTest.test_garbage_collection(self)
1409 CBufferedWriterTest.test_garbage_collection(self)
1410
1411class PyBufferedRandomTest(BufferedRandomTest):
1412 tp = pyio.BufferedRandom
1413
1414
Christian Heimes1a6387e2008-03-26 12:49:49 +00001415# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1416# properties:
1417# - A single output character can correspond to many bytes of input.
1418# - The number of input bytes to complete the character can be
1419# undetermined until the last input byte is received.
1420# - The number of input bytes can vary depending on previous input.
1421# - A single input byte can correspond to many characters of output.
1422# - The number of output characters can be undetermined until the
1423# last input byte is received.
1424# - The number of output characters can vary depending on previous input.
1425
1426class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1427 """
1428 For testing seek/tell behavior with a stateful, buffering decoder.
1429
1430 Input is a sequence of words. Words may be fixed-length (length set
1431 by input) or variable-length (period-terminated). In variable-length
1432 mode, extra periods are ignored. Possible words are:
1433 - 'i' followed by a number sets the input length, I (maximum 99).
1434 When I is set to 0, words are space-terminated.
1435 - 'o' followed by a number sets the output length, O (maximum 99).
1436 - Any other word is converted into a word followed by a period on
1437 the output. The output word consists of the input word truncated
1438 or padded out with hyphens to make its length equal to O. If O
1439 is 0, the word is output verbatim without truncating or padding.
1440 I and O are initially set to 1. When I changes, any buffered input is
1441 re-scanned according to the new I. EOF also terminates the last word.
1442 """
1443
1444 def __init__(self, errors='strict'):
1445 codecs.IncrementalDecoder.__init__(self, errors)
1446 self.reset()
1447
1448 def __repr__(self):
1449 return '<SID %x>' % id(self)
1450
1451 def reset(self):
1452 self.i = 1
1453 self.o = 1
1454 self.buffer = bytearray()
1455
1456 def getstate(self):
1457 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1458 return bytes(self.buffer), i*100 + o
1459
1460 def setstate(self, state):
1461 buffer, io = state
1462 self.buffer = bytearray(buffer)
1463 i, o = divmod(io, 100)
1464 self.i, self.o = i ^ 1, o ^ 1
1465
1466 def decode(self, input, final=False):
1467 output = ''
1468 for b in input:
1469 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001470 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001471 if self.buffer:
1472 output += self.process_word()
1473 else:
1474 self.buffer.append(b)
1475 else: # fixed-length, terminate after self.i bytes
1476 self.buffer.append(b)
1477 if len(self.buffer) == self.i:
1478 output += self.process_word()
1479 if final and self.buffer: # EOF terminates the last word
1480 output += self.process_word()
1481 return output
1482
1483 def process_word(self):
1484 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001485 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001486 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001487 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001488 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1489 else:
1490 output = self.buffer.decode('ascii')
1491 if len(output) < self.o:
1492 output += '-'*self.o # pad out with hyphens
1493 if self.o:
1494 output = output[:self.o] # truncate to output length
1495 output += '.'
1496 self.buffer = bytearray()
1497 return output
1498
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001499 codecEnabled = False
1500
1501 @classmethod
1502 def lookupTestDecoder(cls, name):
1503 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001504 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001505 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001506 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001507 incrementalencoder=None,
1508 streamreader=None, streamwriter=None,
1509 incrementaldecoder=cls)
1510
1511# Register the previous decoder for testing.
1512# Disabled by default, tests will enable it.
1513codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1514
1515
Christian Heimes1a6387e2008-03-26 12:49:49 +00001516class StatefulIncrementalDecoderTest(unittest.TestCase):
1517 """
1518 Make sure the StatefulIncrementalDecoder actually works.
1519 """
1520
1521 test_cases = [
1522 # I=1, O=1 (fixed-length input == fixed-length output)
1523 (b'abcd', False, 'a.b.c.d.'),
1524 # I=0, O=0 (variable-length input, variable-length output)
1525 (b'oiabcd', True, 'abcd.'),
1526 # I=0, O=0 (should ignore extra periods)
1527 (b'oi...abcd...', True, 'abcd.'),
1528 # I=0, O=6 (variable-length input, fixed-length output)
1529 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1530 # I=2, O=6 (fixed-length input < fixed-length output)
1531 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1532 # I=6, O=3 (fixed-length input > fixed-length output)
1533 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1534 # I=0, then 3; O=29, then 15 (with longer output)
1535 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1536 'a----------------------------.' +
1537 'b----------------------------.' +
1538 'cde--------------------------.' +
1539 'abcdefghijabcde.' +
1540 'a.b------------.' +
1541 '.c.------------.' +
1542 'd.e------------.' +
1543 'k--------------.' +
1544 'l--------------.' +
1545 'm--------------.')
1546 ]
1547
Antoine Pitrou19690592009-06-12 20:14:08 +00001548 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001549 # Try a few one-shot test cases.
1550 for input, eof, output in self.test_cases:
1551 d = StatefulIncrementalDecoder()
1552 self.assertEquals(d.decode(input, eof), output)
1553
1554 # Also test an unfinished decode, followed by forcing EOF.
1555 d = StatefulIncrementalDecoder()
1556 self.assertEquals(d.decode(b'oiabcd'), '')
1557 self.assertEquals(d.decode(b'', 1), 'abcd.')
1558
1559class TextIOWrapperTest(unittest.TestCase):
1560
1561 def setUp(self):
1562 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1563 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001564 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001565
1566 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001567 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001568
Antoine Pitrou19690592009-06-12 20:14:08 +00001569 def test_constructor(self):
1570 r = self.BytesIO(b"\xc3\xa9\n\n")
1571 b = self.BufferedReader(r, 1000)
1572 t = self.TextIOWrapper(b)
1573 t.__init__(b, encoding="latin1", newline="\r\n")
1574 self.assertEquals(t.encoding, "latin1")
1575 self.assertEquals(t.line_buffering, False)
1576 t.__init__(b, encoding="utf8", line_buffering=True)
1577 self.assertEquals(t.encoding, "utf8")
1578 self.assertEquals(t.line_buffering, True)
1579 self.assertEquals("\xe9\n", t.readline())
1580 self.assertRaises(TypeError, t.__init__, b, newline=42)
1581 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1582
1583 def test_detach(self):
1584 r = self.BytesIO()
1585 b = self.BufferedWriter(r)
1586 t = self.TextIOWrapper(b)
1587 self.assertIs(t.detach(), b)
1588
1589 t = self.TextIOWrapper(b, encoding="ascii")
1590 t.write("howdy")
1591 self.assertFalse(r.getvalue())
1592 t.detach()
1593 self.assertEqual(r.getvalue(), b"howdy")
1594 self.assertRaises(ValueError, t.detach)
1595
1596 def test_repr(self):
1597 raw = self.BytesIO("hello".encode("utf-8"))
1598 b = self.BufferedReader(raw)
1599 t = self.TextIOWrapper(b, encoding="utf-8")
1600 modname = self.TextIOWrapper.__module__
1601 self.assertEqual(repr(t),
1602 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1603 raw.name = "dummy"
1604 self.assertEqual(repr(t),
1605 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1606 raw.name = b"dummy"
1607 self.assertEqual(repr(t),
1608 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1609
1610 def test_line_buffering(self):
1611 r = self.BytesIO()
1612 b = self.BufferedWriter(r, 1000)
1613 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1614 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001615 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001616 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001617 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001618 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001619 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1620
Antoine Pitrou19690592009-06-12 20:14:08 +00001621 def test_encoding(self):
1622 # Check the encoding attribute is always set, and valid
1623 b = self.BytesIO()
1624 t = self.TextIOWrapper(b, encoding="utf8")
1625 self.assertEqual(t.encoding, "utf8")
1626 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001627 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001628 codecs.lookup(t.encoding)
1629
1630 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001631 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001632 b = self.BytesIO(b"abc\n\xff\n")
1633 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001634 self.assertRaises(UnicodeError, t.read)
1635 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001636 b = self.BytesIO(b"abc\n\xff\n")
1637 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001638 self.assertRaises(UnicodeError, t.read)
1639 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001640 b = self.BytesIO(b"abc\n\xff\n")
1641 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001642 self.assertEquals(t.read(), "abc\n\n")
1643 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001644 b = self.BytesIO(b"abc\n\xff\n")
1645 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1646 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001647
Antoine Pitrou19690592009-06-12 20:14:08 +00001648 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001649 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001650 b = self.BytesIO()
1651 t = self.TextIOWrapper(b, encoding="ascii")
1652 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001653 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001654 b = self.BytesIO()
1655 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1656 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001657 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001658 b = self.BytesIO()
1659 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001660 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001661 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001662 t.flush()
1663 self.assertEquals(b.getvalue(), b"abcdef\n")
1664 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001665 b = self.BytesIO()
1666 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001668 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001669 t.flush()
1670 self.assertEquals(b.getvalue(), b"abc?def\n")
1671
Antoine Pitrou19690592009-06-12 20:14:08 +00001672 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001673 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1674
1675 tests = [
1676 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1677 [ '', input_lines ],
1678 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1679 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1680 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1681 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001682 encodings = (
1683 'utf-8', 'latin-1',
1684 'utf-16', 'utf-16-le', 'utf-16-be',
1685 'utf-32', 'utf-32-le', 'utf-32-be',
1686 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001687
1688 # Try a range of buffer sizes to test the case where \r is the last
1689 # character in TextIOWrapper._pending_line.
1690 for encoding in encodings:
1691 # XXX: str.encode() should return bytes
1692 data = bytes(''.join(input_lines).encode(encoding))
1693 for do_reads in (False, True):
1694 for bufsize in range(1, 10):
1695 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001696 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1697 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001698 encoding=encoding)
1699 if do_reads:
1700 got_lines = []
1701 while True:
1702 c2 = textio.read(2)
1703 if c2 == '':
1704 break
1705 self.assertEquals(len(c2), 2)
1706 got_lines.append(c2 + textio.readline())
1707 else:
1708 got_lines = list(textio)
1709
1710 for got_line, exp_line in zip(got_lines, exp_lines):
1711 self.assertEquals(got_line, exp_line)
1712 self.assertEquals(len(got_lines), len(exp_lines))
1713
Antoine Pitrou19690592009-06-12 20:14:08 +00001714 def test_newlines_input(self):
1715 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001716 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1717 for newline, expected in [
1718 (None, normalized.decode("ascii").splitlines(True)),
1719 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001720 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1721 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1722 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001723 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001724 buf = self.BytesIO(testdata)
1725 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001726 self.assertEquals(txt.readlines(), expected)
1727 txt.seek(0)
1728 self.assertEquals(txt.read(), "".join(expected))
1729
Antoine Pitrou19690592009-06-12 20:14:08 +00001730 def test_newlines_output(self):
1731 testdict = {
1732 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1733 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1734 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1735 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1736 }
1737 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1738 for newline, expected in tests:
1739 buf = self.BytesIO()
1740 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1741 txt.write("AAA\nB")
1742 txt.write("BB\nCCC\n")
1743 txt.write("X\rY\r\nZ")
1744 txt.flush()
1745 self.assertEquals(buf.closed, False)
1746 self.assertEquals(buf.getvalue(), expected)
1747
1748 def test_destructor(self):
1749 l = []
1750 base = self.BytesIO
1751 class MyBytesIO(base):
1752 def close(self):
1753 l.append(self.getvalue())
1754 base.close(self)
1755 b = MyBytesIO()
1756 t = self.TextIOWrapper(b, encoding="ascii")
1757 t.write("abc")
1758 del t
1759 support.gc_collect()
1760 self.assertEquals([b"abc"], l)
1761
1762 def test_override_destructor(self):
1763 record = []
1764 class MyTextIO(self.TextIOWrapper):
1765 def __del__(self):
1766 record.append(1)
1767 try:
1768 f = super(MyTextIO, self).__del__
1769 except AttributeError:
1770 pass
1771 else:
1772 f()
1773 def close(self):
1774 record.append(2)
1775 super(MyTextIO, self).close()
1776 def flush(self):
1777 record.append(3)
1778 super(MyTextIO, self).flush()
1779 b = self.BytesIO()
1780 t = MyTextIO(b, encoding="ascii")
1781 del t
1782 support.gc_collect()
1783 self.assertEqual(record, [1, 2, 3])
1784
1785 def test_error_through_destructor(self):
1786 # Test that the exception state is not modified by a destructor,
1787 # even if close() fails.
1788 rawio = self.CloseFailureIO()
1789 def f():
1790 self.TextIOWrapper(rawio).xyzzy
1791 with support.captured_output("stderr") as s:
1792 self.assertRaises(AttributeError, f)
1793 s = s.getvalue().strip()
1794 if s:
1795 # The destructor *may* have printed an unraisable error, check it
1796 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001797 self.assertTrue(s.startswith("Exception IOError: "), s)
1798 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001799
1800 # Systematic tests of the text I/O API
1801
Antoine Pitrou19690592009-06-12 20:14:08 +00001802 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001803 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1804 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001805 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001806 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001807 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001808 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001809 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001810 f._CHUNK_SIZE = chunksize
1811 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001812 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001813 cookie = f.tell()
1814 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001815 self.assertEquals(f.read(None), "abc")
1816 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001817 self.assertEquals(f.read(2), "ab")
1818 self.assertEquals(f.read(1), "c")
1819 self.assertEquals(f.read(1), "")
1820 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001821 self.assertEquals(f.tell(), cookie)
1822 self.assertEquals(f.seek(0), 0)
1823 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001824 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001825 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001826 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001827 if enc.startswith("utf"):
1828 self.multi_line_test(f, enc)
1829 f.close()
1830
1831 def multi_line_test(self, f, enc):
1832 f.seek(0)
1833 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001834 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835 wlines = []
1836 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1837 chars = []
1838 for i in range(size):
1839 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001840 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001841 wlines.append((f.tell(), line))
1842 f.write(line)
1843 f.seek(0)
1844 rlines = []
1845 while True:
1846 pos = f.tell()
1847 line = f.readline()
1848 if not line:
1849 break
1850 rlines.append((pos, line))
1851 self.assertEquals(rlines, wlines)
1852
Antoine Pitrou19690592009-06-12 20:14:08 +00001853 def test_telling(self):
1854 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001855 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001856 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001857 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001858 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 p2 = f.tell()
1860 f.seek(0)
1861 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001862 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001864 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001865 self.assertEquals(f.tell(), p2)
1866 f.seek(0)
1867 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001868 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001869 self.assertRaises(IOError, f.tell)
1870 self.assertEquals(f.tell(), p2)
1871 f.close()
1872
Antoine Pitrou19690592009-06-12 20:14:08 +00001873 def test_seeking(self):
1874 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 prefix_size = chunk_size - 2
1876 u_prefix = "a" * prefix_size
1877 prefix = bytes(u_prefix.encode("utf-8"))
1878 self.assertEquals(len(u_prefix), len(prefix))
1879 u_suffix = "\u8888\n"
1880 suffix = bytes(u_suffix.encode("utf-8"))
1881 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001882 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001883 f.write(line*2)
1884 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001885 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001887 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001888 self.assertEquals(f.tell(), prefix_size)
1889 self.assertEquals(f.readline(), u_suffix)
1890
Antoine Pitrou19690592009-06-12 20:14:08 +00001891 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001892 # Regression test for a specific bug
1893 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001894 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001895 f.write(data)
1896 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001897 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 f._CHUNK_SIZE # Just test that it exists
1899 f._CHUNK_SIZE = 2
1900 f.readline()
1901 f.tell()
1902
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 def test_seek_and_tell(self):
1904 #Test seek/tell using the StatefulIncrementalDecoder.
1905 # Make test faster by doing smaller seeks
1906 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001907
Antoine Pitrou19690592009-06-12 20:14:08 +00001908 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001909 """Tell/seek to various points within a data stream and ensure
1910 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001911 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 f.write(data)
1913 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001914 f = self.open(support.TESTFN, encoding='test_decoder')
1915 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001916 decoded = f.read()
1917 f.close()
1918
1919 for i in range(min_pos, len(decoded) + 1): # seek positions
1920 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001921 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 self.assertEquals(f.read(i), decoded[:i])
1923 cookie = f.tell()
1924 self.assertEquals(f.read(j), decoded[i:i + j])
1925 f.seek(cookie)
1926 self.assertEquals(f.read(), decoded[i:])
1927 f.close()
1928
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001929 # Enable the test decoder.
1930 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931
1932 # Run the tests.
1933 try:
1934 # Try each test case.
1935 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001936 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001937
1938 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1940 offset = CHUNK_SIZE - len(input)//2
1941 prefix = b'.'*offset
1942 # Don't bother seeking into the prefix (takes too long).
1943 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001944 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001945
1946 # Ensure our test decoder won't interfere with subsequent tests.
1947 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001948 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001949
Antoine Pitrou19690592009-06-12 20:14:08 +00001950 def test_encoded_writes(self):
1951 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001952 tests = ("utf-16",
1953 "utf-16-le",
1954 "utf-16-be",
1955 "utf-32",
1956 "utf-32-le",
1957 "utf-32-be")
1958 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 buf = self.BytesIO()
1960 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961 # Check if the BOM is written only once (see issue1753).
1962 f.write(data)
1963 f.write(data)
1964 f.seek(0)
1965 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001966 f.seek(0)
1967 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1969
Antoine Pitrou19690592009-06-12 20:14:08 +00001970 def test_unreadable(self):
1971 class UnReadable(self.BytesIO):
1972 def readable(self):
1973 return False
1974 txt = self.TextIOWrapper(UnReadable())
1975 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976
Antoine Pitrou19690592009-06-12 20:14:08 +00001977 def test_read_one_by_one(self):
1978 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001979 reads = ""
1980 while True:
1981 c = txt.read(1)
1982 if not c:
1983 break
1984 reads += c
1985 self.assertEquals(reads, "AA\nBB")
1986
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001987 def test_readlines(self):
1988 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1989 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1990 txt.seek(0)
1991 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1992 txt.seek(0)
1993 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1994
Christian Heimes1a6387e2008-03-26 12:49:49 +00001995 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001996 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001997 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001998 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001999 reads = ""
2000 while True:
2001 c = txt.read(128)
2002 if not c:
2003 break
2004 reads += c
2005 self.assertEquals(reads, "A"*127+"\nB")
2006
2007 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002008 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002009
2010 # read one char at a time
2011 reads = ""
2012 while True:
2013 c = txt.read(1)
2014 if not c:
2015 break
2016 reads += c
2017 self.assertEquals(reads, self.normalized)
2018
2019 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 txt._CHUNK_SIZE = 4
2022
2023 reads = ""
2024 while True:
2025 c = txt.read(4)
2026 if not c:
2027 break
2028 reads += c
2029 self.assertEquals(reads, self.normalized)
2030
2031 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033 txt._CHUNK_SIZE = 4
2034
2035 reads = txt.read(4)
2036 reads += txt.read(4)
2037 reads += txt.readline()
2038 reads += txt.readline()
2039 reads += txt.readline()
2040 self.assertEquals(reads, self.normalized)
2041
2042 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002043 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002044 txt._CHUNK_SIZE = 4
2045
2046 reads = txt.read(4)
2047 reads += txt.read()
2048 self.assertEquals(reads, self.normalized)
2049
2050 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002051 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052 txt._CHUNK_SIZE = 4
2053
2054 reads = txt.read(4)
2055 pos = txt.tell()
2056 txt.seek(0)
2057 txt.seek(pos)
2058 self.assertEquals(txt.read(4), "BBB\n")
2059
2060 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002061 buffer = self.BytesIO(self.testdata)
2062 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002063
2064 self.assertEqual(buffer.seekable(), txt.seekable())
2065
Antoine Pitrou19690592009-06-12 20:14:08 +00002066 @unittest.skip("Issue #6213 with incremental encoders")
2067 def test_append_bom(self):
2068 # The BOM is not written again when appending to a non-empty file
2069 filename = support.TESTFN
2070 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2071 with self.open(filename, 'w', encoding=charset) as f:
2072 f.write('aaa')
2073 pos = f.tell()
2074 with self.open(filename, 'rb') as f:
2075 self.assertEquals(f.read(), 'aaa'.encode(charset))
2076
2077 with self.open(filename, 'a', encoding=charset) as f:
2078 f.write('xxx')
2079 with self.open(filename, 'rb') as f:
2080 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2081
2082 @unittest.skip("Issue #6213 with incremental encoders")
2083 def test_seek_bom(self):
2084 # Same test, but when seeking manually
2085 filename = support.TESTFN
2086 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2087 with self.open(filename, 'w', encoding=charset) as f:
2088 f.write('aaa')
2089 pos = f.tell()
2090 with self.open(filename, 'r+', encoding=charset) as f:
2091 f.seek(pos)
2092 f.write('zzz')
2093 f.seek(0)
2094 f.write('bbb')
2095 with self.open(filename, 'rb') as f:
2096 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2097
2098 def test_errors_property(self):
2099 with self.open(support.TESTFN, "w") as f:
2100 self.assertEqual(f.errors, "strict")
2101 with self.open(support.TESTFN, "w", errors="replace") as f:
2102 self.assertEqual(f.errors, "replace")
2103
2104
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002105 def test_threads_write(self):
2106 # Issue6750: concurrent writes could duplicate data
2107 event = threading.Event()
2108 with self.open(support.TESTFN, "w", buffering=1) as f:
2109 def run(n):
2110 text = "Thread%03d\n" % n
2111 event.wait()
2112 f.write(text)
2113 threads = [threading.Thread(target=lambda n=x: run(n))
2114 for x in range(20)]
2115 for t in threads:
2116 t.start()
2117 time.sleep(0.02)
2118 event.set()
2119 for t in threads:
2120 t.join()
2121 with self.open(support.TESTFN) as f:
2122 content = f.read()
2123 for n in range(20):
2124 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2125
Antoine Pitrou19690592009-06-12 20:14:08 +00002126class CTextIOWrapperTest(TextIOWrapperTest):
2127
2128 def test_initialization(self):
2129 r = self.BytesIO(b"\xc3\xa9\n\n")
2130 b = self.BufferedReader(r, 1000)
2131 t = self.TextIOWrapper(b)
2132 self.assertRaises(TypeError, t.__init__, b, newline=42)
2133 self.assertRaises(ValueError, t.read)
2134 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2135 self.assertRaises(ValueError, t.read)
2136
2137 def test_garbage_collection(self):
2138 # C TextIOWrapper objects are collected, and collecting them flushes
2139 # all data to disk.
2140 # The Python version has __del__, so it ends in gc.garbage instead.
2141 rawio = io.FileIO(support.TESTFN, "wb")
2142 b = self.BufferedWriter(rawio)
2143 t = self.TextIOWrapper(b, encoding="ascii")
2144 t.write("456def")
2145 t.x = t
2146 wr = weakref.ref(t)
2147 del t
2148 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002149 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002150 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002151 self.assertEqual(f.read(), b"456def")
2152
2153class PyTextIOWrapperTest(TextIOWrapperTest):
2154 pass
2155
2156
2157class IncrementalNewlineDecoderTest(unittest.TestCase):
2158
2159 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002160 # UTF-8 specific tests for a newline decoder
2161 def _check_decode(b, s, **kwargs):
2162 # We exercise getstate() / setstate() as well as decode()
2163 state = decoder.getstate()
2164 self.assertEquals(decoder.decode(b, **kwargs), s)
2165 decoder.setstate(state)
2166 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002168 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002169
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002170 _check_decode(b'\xe8', "")
2171 _check_decode(b'\xa2', "")
2172 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002173
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002174 _check_decode(b'\xe8', "")
2175 _check_decode(b'\xa2', "")
2176 _check_decode(b'\x88', "\u8888")
2177
2178 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002179 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2180
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002181 decoder.reset()
2182 _check_decode(b'\n', "\n")
2183 _check_decode(b'\r', "")
2184 _check_decode(b'', "\n", final=True)
2185 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002186
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002187 _check_decode(b'\r', "")
2188 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002189
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002190 _check_decode(b'\r\r\n', "\n\n")
2191 _check_decode(b'\r', "")
2192 _check_decode(b'\r', "\n")
2193 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002195 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2196 _check_decode(b'\xe8\xa2\x88', "\u8888")
2197 _check_decode(b'\n', "\n")
2198 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2199 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002200
Antoine Pitrou19690592009-06-12 20:14:08 +00002201 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002202 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002203 if encoding is not None:
2204 encoder = codecs.getincrementalencoder(encoding)()
2205 def _decode_bytewise(s):
2206 # Decode one byte at a time
2207 for b in encoder.encode(s):
2208 result.append(decoder.decode(b))
2209 else:
2210 encoder = None
2211 def _decode_bytewise(s):
2212 # Decode one char at a time
2213 for c in s:
2214 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002215 self.assertEquals(decoder.newlines, None)
2216 _decode_bytewise("abc\n\r")
2217 self.assertEquals(decoder.newlines, '\n')
2218 _decode_bytewise("\nabc")
2219 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2220 _decode_bytewise("abc\r")
2221 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2222 _decode_bytewise("abc")
2223 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2224 _decode_bytewise("abc\r")
2225 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2226 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002227 input = "abc"
2228 if encoder is not None:
2229 encoder.reset()
2230 input = encoder.encode(input)
2231 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002232 self.assertEquals(decoder.newlines, None)
2233
2234 def test_newline_decoder(self):
2235 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002236 # None meaning the IncrementalNewlineDecoder takes unicode input
2237 # rather than bytes input
2238 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002239 'utf-16', 'utf-16-le', 'utf-16-be',
2240 'utf-32', 'utf-32-le', 'utf-32-be',
2241 )
2242 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002243 decoder = enc and codecs.getincrementaldecoder(enc)()
2244 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2245 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002246 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002247 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2248 self.check_newline_decoding_utf8(decoder)
2249
2250 def test_newline_bytes(self):
2251 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2252 def _check(dec):
2253 self.assertEquals(dec.newlines, None)
2254 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2255 self.assertEquals(dec.newlines, None)
2256 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2257 self.assertEquals(dec.newlines, None)
2258 dec = self.IncrementalNewlineDecoder(None, translate=False)
2259 _check(dec)
2260 dec = self.IncrementalNewlineDecoder(None, translate=True)
2261 _check(dec)
2262
2263class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2264 pass
2265
2266class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2267 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002268
Christian Heimes1a6387e2008-03-26 12:49:49 +00002269
2270# XXX Tests for open()
2271
2272class MiscIOTest(unittest.TestCase):
2273
Benjamin Petersonad100c32008-11-20 22:06:22 +00002274 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002275 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002276
Antoine Pitrou19690592009-06-12 20:14:08 +00002277 def test___all__(self):
2278 for name in self.io.__all__:
2279 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002280 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002281 if name == "open":
2282 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002284 self.assertTrue(issubclass(obj, Exception), name)
2285 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002286 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002287
Benjamin Petersonad100c32008-11-20 22:06:22 +00002288 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002289 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002290 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002291 f.close()
2292
Antoine Pitrou19690592009-06-12 20:14:08 +00002293 f = self.open(support.TESTFN, "U")
2294 self.assertEquals(f.name, support.TESTFN)
2295 self.assertEquals(f.buffer.name, support.TESTFN)
2296 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002297 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002298 self.assertEquals(f.buffer.mode, "rb")
2299 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002300 f.close()
2301
Antoine Pitrou19690592009-06-12 20:14:08 +00002302 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002303 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002304 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2305 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002306
Antoine Pitrou19690592009-06-12 20:14:08 +00002307 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002308 self.assertEquals(g.mode, "wb")
2309 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002310 self.assertEquals(g.name, f.fileno())
2311 self.assertEquals(g.raw.name, f.fileno())
2312 f.close()
2313 g.close()
2314
Antoine Pitrou19690592009-06-12 20:14:08 +00002315 def test_io_after_close(self):
2316 for kwargs in [
2317 {"mode": "w"},
2318 {"mode": "wb"},
2319 {"mode": "w", "buffering": 1},
2320 {"mode": "w", "buffering": 2},
2321 {"mode": "wb", "buffering": 0},
2322 {"mode": "r"},
2323 {"mode": "rb"},
2324 {"mode": "r", "buffering": 1},
2325 {"mode": "r", "buffering": 2},
2326 {"mode": "rb", "buffering": 0},
2327 {"mode": "w+"},
2328 {"mode": "w+b"},
2329 {"mode": "w+", "buffering": 1},
2330 {"mode": "w+", "buffering": 2},
2331 {"mode": "w+b", "buffering": 0},
2332 ]:
2333 f = self.open(support.TESTFN, **kwargs)
2334 f.close()
2335 self.assertRaises(ValueError, f.flush)
2336 self.assertRaises(ValueError, f.fileno)
2337 self.assertRaises(ValueError, f.isatty)
2338 self.assertRaises(ValueError, f.__iter__)
2339 if hasattr(f, "peek"):
2340 self.assertRaises(ValueError, f.peek, 1)
2341 self.assertRaises(ValueError, f.read)
2342 if hasattr(f, "read1"):
2343 self.assertRaises(ValueError, f.read1, 1024)
2344 if hasattr(f, "readinto"):
2345 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2346 self.assertRaises(ValueError, f.readline)
2347 self.assertRaises(ValueError, f.readlines)
2348 self.assertRaises(ValueError, f.seek, 0)
2349 self.assertRaises(ValueError, f.tell)
2350 self.assertRaises(ValueError, f.truncate)
2351 self.assertRaises(ValueError, f.write,
2352 b"" if "b" in kwargs['mode'] else "")
2353 self.assertRaises(ValueError, f.writelines, [])
2354 self.assertRaises(ValueError, next, f)
2355
2356 def test_blockingioerror(self):
2357 # Various BlockingIOError issues
2358 self.assertRaises(TypeError, self.BlockingIOError)
2359 self.assertRaises(TypeError, self.BlockingIOError, 1)
2360 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2361 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2362 b = self.BlockingIOError(1, "")
2363 self.assertEqual(b.characters_written, 0)
2364 class C(unicode):
2365 pass
2366 c = C("")
2367 b = self.BlockingIOError(1, c)
2368 c.b = b
2369 b.c = c
2370 wr = weakref.ref(c)
2371 del c, b
2372 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002373 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002374
2375 def test_abcs(self):
2376 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002377 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2378 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2379 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2380 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002381
2382 def _check_abc_inheritance(self, abcmodule):
2383 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002384 self.assertIsInstance(f, abcmodule.IOBase)
2385 self.assertIsInstance(f, abcmodule.RawIOBase)
2386 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2387 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002388 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002389 self.assertIsInstance(f, abcmodule.IOBase)
2390 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2391 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2392 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002393 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002394 self.assertIsInstance(f, abcmodule.IOBase)
2395 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2396 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2397 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002398
2399 def test_abc_inheritance(self):
2400 # Test implementations inherit from their respective ABCs
2401 self._check_abc_inheritance(self)
2402
2403 def test_abc_inheritance_official(self):
2404 # Test implementations inherit from the official ABCs of the
2405 # baseline "io" module.
2406 self._check_abc_inheritance(io)
2407
2408class CMiscIOTest(MiscIOTest):
2409 io = io
2410
2411class PyMiscIOTest(MiscIOTest):
2412 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002413
Christian Heimes1a6387e2008-03-26 12:49:49 +00002414def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002415 tests = (CIOTest, PyIOTest,
2416 CBufferedReaderTest, PyBufferedReaderTest,
2417 CBufferedWriterTest, PyBufferedWriterTest,
2418 CBufferedRWPairTest, PyBufferedRWPairTest,
2419 CBufferedRandomTest, PyBufferedRandomTest,
2420 StatefulIncrementalDecoderTest,
2421 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2422 CTextIOWrapperTest, PyTextIOWrapperTest,
2423 CMiscIOTest, PyMiscIOTest,
2424 )
2425
2426 # Put the namespaces of the IO module we are testing and some useful mock
2427 # classes in the __dict__ of each test.
2428 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2429 MockNonBlockWriterIO)
2430 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2431 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2432 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2433 globs = globals()
2434 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2435 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2436 # Avoid turning open into a bound method.
2437 py_io_ns["open"] = pyio.OpenWrapper
2438 for test in tests:
2439 if test.__name__.startswith("C"):
2440 for name, obj in c_io_ns.items():
2441 setattr(test, name, obj)
2442 elif test.__name__.startswith("Py"):
2443 for name, obj in py_io_ns.items():
2444 setattr(test, name, obj)
2445
2446 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002447
2448if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002449 test_main()