blob: 9ffe646ea9fd2b67e8765efd9f1dad69862f0bdc [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000033import abc
Georg Brandla4f46e12010-02-07 17:03:15 +000034from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000035from collections import deque
36from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000037
38import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
41
42__metaclass__ = type
43bytes = support.py3k_bytes
44
45def _default_chunk_size():
46 """Get the default TextIOWrapper chunk size"""
47 with io.open(__file__, "r", encoding="latin1") as f:
48 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000049
50
Antoine Pitrou19690592009-06-12 20:14:08 +000051class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000052
53 def __init__(self, read_stack=()):
54 self._read_stack = list(read_stack)
55 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000056 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000057
58 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000059 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000060 try:
61 return self._read_stack.pop(0)
62 except:
63 return b""
64
65 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000066 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000067 return len(b)
68
69 def writable(self):
70 return True
71
72 def fileno(self):
73 return 42
74
75 def readable(self):
76 return True
77
78 def seekable(self):
79 return True
80
81 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000082 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000083
84 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000085 return 0 # same comment as above
86
87 def readinto(self, buf):
88 self._reads += 1
89 max_len = len(buf)
90 try:
91 data = self._read_stack[0]
92 except IndexError:
93 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
110class CMockRawIO(MockRawIO, io.RawIOBase):
111 pass
112
113class PyMockRawIO(MockRawIO, pyio.RawIOBase):
114 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000115
116
Antoine Pitrou19690592009-06-12 20:14:08 +0000117class MisbehavedRawIO(MockRawIO):
118 def write(self, b):
119 return MockRawIO.write(self, b) * 2
120
121 def read(self, n=None):
122 return MockRawIO.read(self, n) * 2
123
124 def seek(self, pos, whence):
125 return -123
126
127 def tell(self):
128 return -456
129
130 def readinto(self, buf):
131 MockRawIO.readinto(self, buf)
132 return len(buf) * 5
133
134class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
135 pass
136
137class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
138 pass
139
140
141class CloseFailureIO(MockRawIO):
142 closed = 0
143
144 def close(self):
145 if not self.closed:
146 self.closed = 1
147 raise IOError
148
149class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
150 pass
151
152class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
153 pass
154
155
156class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000157
158 def __init__(self, data):
159 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000160 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000161
162 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000163 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000164 self.read_history.append(None if res is None else len(res))
165 return res
166
Antoine Pitrou19690592009-06-12 20:14:08 +0000167 def readinto(self, b):
168 res = super(MockFileIO, self).readinto(b)
169 self.read_history.append(res)
170 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000171
Antoine Pitrou19690592009-06-12 20:14:08 +0000172class CMockFileIO(MockFileIO, io.BytesIO):
173 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000174
Antoine Pitrou19690592009-06-12 20:14:08 +0000175class PyMockFileIO(MockFileIO, pyio.BytesIO):
176 pass
177
178
179class MockNonBlockWriterIO:
180
181 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000182 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000183 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 def pop_written(self):
186 s = b"".join(self._write_stack)
187 self._write_stack[:] = []
188 return s
189
190 def block_on(self, char):
191 """Block when a given char is encountered."""
192 self._blocker_char = char
193
194 def readable(self):
195 return True
196
197 def seekable(self):
198 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000199
200 def writable(self):
201 return True
202
Antoine Pitrou19690592009-06-12 20:14:08 +0000203 def write(self, b):
204 b = bytes(b)
205 n = -1
206 if self._blocker_char:
207 try:
208 n = b.index(self._blocker_char)
209 except ValueError:
210 pass
211 else:
212 self._blocker_char = None
213 self._write_stack.append(b[:n])
214 raise self.BlockingIOError(0, "test blocking", n)
215 self._write_stack.append(b)
216 return len(b)
217
218class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
219 BlockingIOError = io.BlockingIOError
220
221class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
222 BlockingIOError = pyio.BlockingIOError
223
Christian Heimes1a6387e2008-03-26 12:49:49 +0000224
225class IOTest(unittest.TestCase):
226
Antoine Pitrou19690592009-06-12 20:14:08 +0000227 def setUp(self):
228 support.unlink(support.TESTFN)
229
Christian Heimes1a6387e2008-03-26 12:49:49 +0000230 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000231 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000232
233 def write_ops(self, f):
234 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000235 f.truncate(0)
236 self.assertEqual(f.tell(), 5)
237 f.seek(0)
238
239 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000240 self.assertEqual(f.seek(0), 0)
241 self.assertEqual(f.write(b"Hello."), 6)
242 self.assertEqual(f.tell(), 6)
243 self.assertEqual(f.seek(-1, 1), 5)
244 self.assertEqual(f.tell(), 5)
245 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
246 self.assertEqual(f.seek(0), 0)
247 self.assertEqual(f.write(b"h"), 1)
248 self.assertEqual(f.seek(-1, 2), 13)
249 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000250
Christian Heimes1a6387e2008-03-26 12:49:49 +0000251 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000252 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000253 self.assertRaises(TypeError, f.seek, 0.0)
254
255 def read_ops(self, f, buffered=False):
256 data = f.read(5)
257 self.assertEqual(data, b"hello")
258 data = bytearray(data)
259 self.assertEqual(f.readinto(data), 5)
260 self.assertEqual(data, b" worl")
261 self.assertEqual(f.readinto(data), 2)
262 self.assertEqual(len(data), 5)
263 self.assertEqual(data[:2], b"d\n")
264 self.assertEqual(f.seek(0), 0)
265 self.assertEqual(f.read(20), b"hello world\n")
266 self.assertEqual(f.read(1), b"")
267 self.assertEqual(f.readinto(bytearray(b"x")), 0)
268 self.assertEqual(f.seek(-6, 2), 6)
269 self.assertEqual(f.read(5), b"world")
270 self.assertEqual(f.read(0), b"")
271 self.assertEqual(f.readinto(bytearray()), 0)
272 self.assertEqual(f.seek(-6, 1), 5)
273 self.assertEqual(f.read(5), b" worl")
274 self.assertEqual(f.tell(), 10)
275 self.assertRaises(TypeError, f.seek, 0.0)
276 if buffered:
277 f.seek(0)
278 self.assertEqual(f.read(), b"hello world\n")
279 f.seek(6)
280 self.assertEqual(f.read(), b"world\n")
281 self.assertEqual(f.read(), b"")
282
283 LARGE = 2**31
284
285 def large_file_ops(self, f):
286 assert f.readable()
287 assert f.writable()
288 self.assertEqual(f.seek(self.LARGE), self.LARGE)
289 self.assertEqual(f.tell(), self.LARGE)
290 self.assertEqual(f.write(b"xxx"), 3)
291 self.assertEqual(f.tell(), self.LARGE + 3)
292 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
293 self.assertEqual(f.truncate(), self.LARGE + 2)
294 self.assertEqual(f.tell(), self.LARGE + 2)
295 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
296 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000297 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000298 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
299 self.assertEqual(f.seek(-1, 2), self.LARGE)
300 self.assertEqual(f.read(2), b"x")
301
Antoine Pitrou19690592009-06-12 20:14:08 +0000302 def test_invalid_operations(self):
303 # Try writing on a file opened in read mode and vice-versa.
304 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000305 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000306 self.assertRaises(IOError, fp.read)
307 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000308 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000309 self.assertRaises(IOError, fp.write, b"blah")
310 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000311 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 self.assertRaises(IOError, fp.write, "blah")
313 self.assertRaises(IOError, fp.writelines, ["blah\n"])
314
Christian Heimes1a6387e2008-03-26 12:49:49 +0000315 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000316 with self.open(support.TESTFN, "wb", buffering=0) as f:
317 self.assertEqual(f.readable(), False)
318 self.assertEqual(f.writable(), True)
319 self.assertEqual(f.seekable(), True)
320 self.write_ops(f)
321 with self.open(support.TESTFN, "rb", buffering=0) as f:
322 self.assertEqual(f.readable(), True)
323 self.assertEqual(f.writable(), False)
324 self.assertEqual(f.seekable(), True)
325 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000326
327 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000328 with self.open(support.TESTFN, "wb") as f:
329 self.assertEqual(f.readable(), False)
330 self.assertEqual(f.writable(), True)
331 self.assertEqual(f.seekable(), True)
332 self.write_ops(f)
333 with self.open(support.TESTFN, "rb") as f:
334 self.assertEqual(f.readable(), True)
335 self.assertEqual(f.writable(), False)
336 self.assertEqual(f.seekable(), True)
337 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000338
339 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000340 with self.open(support.TESTFN, "wb") as f:
341 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
342 with self.open(support.TESTFN, "rb") as f:
343 self.assertEqual(f.readline(), b"abc\n")
344 self.assertEqual(f.readline(10), b"def\n")
345 self.assertEqual(f.readline(2), b"xy")
346 self.assertEqual(f.readline(4), b"zzy\n")
347 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000348 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000349 self.assertRaises(TypeError, f.readline, 5.3)
350 with self.open(support.TESTFN, "r") as f:
351 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000352
353 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000354 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355 self.write_ops(f)
356 data = f.getvalue()
357 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000358 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000359 self.read_ops(f, True)
360
361 def test_large_file_ops(self):
362 # On Windows and Mac OSX this test comsumes large resources; It takes
363 # a long time to build the >2GB file and takes >2GB of disk space
364 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000365 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
366 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000367 print("\nTesting large file ops skipped on %s." % sys.platform,
368 file=sys.stderr)
369 print("It requires %d bytes and a long time." % self.LARGE,
370 file=sys.stderr)
371 print("Use 'regrtest.py -u largefile test_io' to run it.",
372 file=sys.stderr)
373 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000374 with self.open(support.TESTFN, "w+b", 0) as f:
375 self.large_file_ops(f)
376 with self.open(support.TESTFN, "w+b") as f:
377 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000378
379 def test_with_open(self):
380 for bufsize in (0, 1, 100):
381 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000382 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000383 f.write(b"xxx")
384 self.assertEqual(f.closed, True)
385 f = None
386 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000387 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000388 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000389 except ZeroDivisionError:
390 self.assertEqual(f.closed, True)
391 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000392 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000393
Antoine Pitroue741cc62009-01-21 00:45:36 +0000394 # issue 5008
395 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000396 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000397 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000398 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000399 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000401 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000403 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000404
Christian Heimes1a6387e2008-03-26 12:49:49 +0000405 def test_destructor(self):
406 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000407 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 def __del__(self):
409 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000410 try:
411 f = super(MyFileIO, self).__del__
412 except AttributeError:
413 pass
414 else:
415 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000416 def close(self):
417 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000418 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419 def flush(self):
420 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 super(MyFileIO, self).flush()
422 f = MyFileIO(support.TESTFN, "wb")
423 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000424 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000425 support.gc_collect()
426 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000427 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 self.assertEqual(f.read(), b"xxx")
429
430 def _check_base_destructor(self, base):
431 record = []
432 class MyIO(base):
433 def __init__(self):
434 # This exercises the availability of attributes on object
435 # destruction.
436 # (in the C version, close() is called by the tp_dealloc
437 # function, not by __del__)
438 self.on_del = 1
439 self.on_close = 2
440 self.on_flush = 3
441 def __del__(self):
442 record.append(self.on_del)
443 try:
444 f = super(MyIO, self).__del__
445 except AttributeError:
446 pass
447 else:
448 f()
449 def close(self):
450 record.append(self.on_close)
451 super(MyIO, self).close()
452 def flush(self):
453 record.append(self.on_flush)
454 super(MyIO, self).flush()
455 f = MyIO()
456 del f
457 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000458 self.assertEqual(record, [1, 2, 3])
459
Antoine Pitrou19690592009-06-12 20:14:08 +0000460 def test_IOBase_destructor(self):
461 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000462
Antoine Pitrou19690592009-06-12 20:14:08 +0000463 def test_RawIOBase_destructor(self):
464 self._check_base_destructor(self.RawIOBase)
465
466 def test_BufferedIOBase_destructor(self):
467 self._check_base_destructor(self.BufferedIOBase)
468
469 def test_TextIOBase_destructor(self):
470 self._check_base_destructor(self.TextIOBase)
471
472 def test_close_flushes(self):
473 with self.open(support.TESTFN, "wb") as f:
474 f.write(b"xxx")
475 with self.open(support.TESTFN, "rb") as f:
476 self.assertEqual(f.read(), b"xxx")
477
478 def test_array_writes(self):
479 a = array.array(b'i', range(10))
480 n = len(a.tostring())
481 with self.open(support.TESTFN, "wb", 0) as f:
482 self.assertEqual(f.write(a), n)
483 with self.open(support.TESTFN, "wb") as f:
484 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000485
486 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000487 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488 closefd=False)
489
Antoine Pitrou19690592009-06-12 20:14:08 +0000490 def test_read_closed(self):
491 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000492 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000493 with self.open(support.TESTFN, "r") as f:
494 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000495 self.assertEqual(file.read(), "egg\n")
496 file.seek(0)
497 file.close()
498 self.assertRaises(ValueError, file.read)
499
500 def test_no_closefd_with_filename(self):
501 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000502 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000503
504 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000505 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000506 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000507 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000508 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000510 self.assertEqual(file.buffer.raw.closefd, False)
511
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 def test_garbage_collection(self):
513 # FileIO objects are collected, and collecting them flushes
514 # all data to disk.
515 f = self.FileIO(support.TESTFN, "wb")
516 f.write(b"abcxxx")
517 f.f = f
518 wr = weakref.ref(f)
519 del f
520 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000521 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000522 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000523 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000524
Antoine Pitrou19690592009-06-12 20:14:08 +0000525 def test_unbounded_file(self):
526 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
527 zero = "/dev/zero"
528 if not os.path.exists(zero):
529 self.skipTest("{0} does not exist".format(zero))
530 if sys.maxsize > 0x7FFFFFFF:
531 self.skipTest("test can only run in a 32-bit address space")
532 if support.real_max_memuse < support._2G:
533 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000534 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000535 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000536 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000538 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000539 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000540
Antoine Pitrou19690592009-06-12 20:14:08 +0000541class CIOTest(IOTest):
542 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544class PyIOTest(IOTest):
545 test_array_writes = unittest.skip(
546 "len(array.array) returns number of elements rather than bytelength"
547 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000548
549
Antoine Pitrou19690592009-06-12 20:14:08 +0000550class CommonBufferedTests:
551 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
552
553 def test_detach(self):
554 raw = self.MockRawIO()
555 buf = self.tp(raw)
556 self.assertIs(buf.detach(), raw)
557 self.assertRaises(ValueError, buf.detach)
558
559 def test_fileno(self):
560 rawio = self.MockRawIO()
561 bufio = self.tp(rawio)
562
563 self.assertEquals(42, bufio.fileno())
564
565 def test_no_fileno(self):
566 # XXX will we always have fileno() function? If so, kill
567 # this test. Else, write it.
568 pass
569
570 def test_invalid_args(self):
571 rawio = self.MockRawIO()
572 bufio = self.tp(rawio)
573 # Invalid whence
574 self.assertRaises(ValueError, bufio.seek, 0, -1)
575 self.assertRaises(ValueError, bufio.seek, 0, 3)
576
577 def test_override_destructor(self):
578 tp = self.tp
579 record = []
580 class MyBufferedIO(tp):
581 def __del__(self):
582 record.append(1)
583 try:
584 f = super(MyBufferedIO, self).__del__
585 except AttributeError:
586 pass
587 else:
588 f()
589 def close(self):
590 record.append(2)
591 super(MyBufferedIO, self).close()
592 def flush(self):
593 record.append(3)
594 super(MyBufferedIO, self).flush()
595 rawio = self.MockRawIO()
596 bufio = MyBufferedIO(rawio)
597 writable = bufio.writable()
598 del bufio
599 support.gc_collect()
600 if writable:
601 self.assertEqual(record, [1, 2, 3])
602 else:
603 self.assertEqual(record, [1, 2])
604
605 def test_context_manager(self):
606 # Test usability as a context manager
607 rawio = self.MockRawIO()
608 bufio = self.tp(rawio)
609 def _with():
610 with bufio:
611 pass
612 _with()
613 # bufio should now be closed, and using it a second time should raise
614 # a ValueError.
615 self.assertRaises(ValueError, _with)
616
617 def test_error_through_destructor(self):
618 # Test that the exception state is not modified by a destructor,
619 # even if close() fails.
620 rawio = self.CloseFailureIO()
621 def f():
622 self.tp(rawio).xyzzy
623 with support.captured_output("stderr") as s:
624 self.assertRaises(AttributeError, f)
625 s = s.getvalue().strip()
626 if s:
627 # The destructor *may* have printed an unraisable error, check it
628 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000629 self.assertTrue(s.startswith("Exception IOError: "), s)
630 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000631
632 def test_repr(self):
633 raw = self.MockRawIO()
634 b = self.tp(raw)
635 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
636 self.assertEqual(repr(b), "<%s>" % clsname)
637 raw.name = "dummy"
638 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
639 raw.name = b"dummy"
640 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000641
642
Antoine Pitrou19690592009-06-12 20:14:08 +0000643class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
644 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000645
Antoine Pitrou19690592009-06-12 20:14:08 +0000646 def test_constructor(self):
647 rawio = self.MockRawIO([b"abc"])
648 bufio = self.tp(rawio)
649 bufio.__init__(rawio)
650 bufio.__init__(rawio, buffer_size=1024)
651 bufio.__init__(rawio, buffer_size=16)
652 self.assertEquals(b"abc", bufio.read())
653 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
654 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
655 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
656 rawio = self.MockRawIO([b"abc"])
657 bufio.__init__(rawio)
658 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000659
Antoine Pitrou19690592009-06-12 20:14:08 +0000660 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000661 for arg in (None, 7):
662 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
663 bufio = self.tp(rawio)
664 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000665 # Invalid args
666 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000667
Antoine Pitrou19690592009-06-12 20:14:08 +0000668 def test_read1(self):
669 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
670 bufio = self.tp(rawio)
671 self.assertEquals(b"a", bufio.read(1))
672 self.assertEquals(b"b", bufio.read1(1))
673 self.assertEquals(rawio._reads, 1)
674 self.assertEquals(b"c", bufio.read1(100))
675 self.assertEquals(rawio._reads, 1)
676 self.assertEquals(b"d", bufio.read1(100))
677 self.assertEquals(rawio._reads, 2)
678 self.assertEquals(b"efg", bufio.read1(100))
679 self.assertEquals(rawio._reads, 3)
680 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000681 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000682 # Invalid args
683 self.assertRaises(ValueError, bufio.read1, -1)
684
685 def test_readinto(self):
686 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
687 bufio = self.tp(rawio)
688 b = bytearray(2)
689 self.assertEquals(bufio.readinto(b), 2)
690 self.assertEquals(b, b"ab")
691 self.assertEquals(bufio.readinto(b), 2)
692 self.assertEquals(b, b"cd")
693 self.assertEquals(bufio.readinto(b), 2)
694 self.assertEquals(b, b"ef")
695 self.assertEquals(bufio.readinto(b), 1)
696 self.assertEquals(b, b"gf")
697 self.assertEquals(bufio.readinto(b), 0)
698 self.assertEquals(b, b"gf")
699
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000700 def test_readlines(self):
701 def bufio():
702 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
703 return self.tp(rawio)
704 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
705 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
706 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
707
Antoine Pitrou19690592009-06-12 20:14:08 +0000708 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000709 data = b"abcdefghi"
710 dlen = len(data)
711
712 tests = [
713 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
714 [ 100, [ 3, 3, 3], [ dlen ] ],
715 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
716 ]
717
718 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000719 rawio = self.MockFileIO(data)
720 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000721 pos = 0
722 for nbytes in buf_read_sizes:
723 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
724 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000725 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000726 self.assertEquals(rawio.read_history, raw_read_sizes)
727
Antoine Pitrou19690592009-06-12 20:14:08 +0000728 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000730 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
731 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000732
733 self.assertEquals(b"abcd", bufio.read(6))
734 self.assertEquals(b"e", bufio.read(1))
735 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000736 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000737 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000738 self.assertEquals(b"", bufio.read())
739
Antoine Pitrou19690592009-06-12 20:14:08 +0000740 def test_read_past_eof(self):
741 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
742 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000743
744 self.assertEquals(b"abcdefg", bufio.read(9000))
745
Antoine Pitrou19690592009-06-12 20:14:08 +0000746 def test_read_all(self):
747 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
748 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000749
750 self.assertEquals(b"abcdefg", bufio.read())
751
Antoine Pitrou19690592009-06-12 20:14:08 +0000752 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000753 try:
754 # Write out many bytes with exactly the same number of 0's,
755 # 1's... 255's. This will help us check that concurrent reading
756 # doesn't duplicate or forget contents.
757 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000758 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000759 random.shuffle(l)
760 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000761 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000762 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000763 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000764 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000765 errors = []
766 results = []
767 def f():
768 try:
769 # Intra-buffer read then buffer-flushing read
770 for n in cycle([1, 19]):
771 s = bufio.read(n)
772 if not s:
773 break
774 # list.append() is atomic
775 results.append(s)
776 except Exception as e:
777 errors.append(e)
778 raise
779 threads = [threading.Thread(target=f) for x in range(20)]
780 for t in threads:
781 t.start()
782 time.sleep(0.02) # yield
783 for t in threads:
784 t.join()
785 self.assertFalse(errors,
786 "the following exceptions were caught: %r" % errors)
787 s = b''.join(results)
788 for i in range(256):
789 c = bytes(bytearray([i]))
790 self.assertEqual(s.count(c), N)
791 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000792 support.unlink(support.TESTFN)
793
794 def test_misbehaved_io(self):
795 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
797 self.assertRaises(IOError, bufio.seek, 0)
798 self.assertRaises(IOError, bufio.tell)
799
800class CBufferedReaderTest(BufferedReaderTest):
801 tp = io.BufferedReader
802
803 def test_constructor(self):
804 BufferedReaderTest.test_constructor(self)
805 # The allocation can succeed on 32-bit builds, e.g. with more
806 # than 2GB RAM and a 64-bit kernel.
807 if sys.maxsize > 0x7FFFFFFF:
808 rawio = self.MockRawIO()
809 bufio = self.tp(rawio)
810 self.assertRaises((OverflowError, MemoryError, ValueError),
811 bufio.__init__, rawio, sys.maxsize)
812
813 def test_initialization(self):
814 rawio = self.MockRawIO([b"abc"])
815 bufio = self.tp(rawio)
816 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
817 self.assertRaises(ValueError, bufio.read)
818 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
819 self.assertRaises(ValueError, bufio.read)
820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
821 self.assertRaises(ValueError, bufio.read)
822
823 def test_misbehaved_io_read(self):
824 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
825 bufio = self.tp(rawio)
826 # _pyio.BufferedReader seems to implement reading different, so that
827 # checking this is not so easy.
828 self.assertRaises(IOError, bufio.read, 10)
829
830 def test_garbage_collection(self):
831 # C BufferedReader objects are collected.
832 # The Python version has __del__, so it ends into gc.garbage instead
833 rawio = self.FileIO(support.TESTFN, "w+b")
834 f = self.tp(rawio)
835 f.f = f
836 wr = weakref.ref(f)
837 del f
838 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000839 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000840
841class PyBufferedReaderTest(BufferedReaderTest):
842 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000843
844
Antoine Pitrou19690592009-06-12 20:14:08 +0000845class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
846 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000847
Antoine Pitrou19690592009-06-12 20:14:08 +0000848 def test_constructor(self):
849 rawio = self.MockRawIO()
850 bufio = self.tp(rawio)
851 bufio.__init__(rawio)
852 bufio.__init__(rawio, buffer_size=1024)
853 bufio.__init__(rawio, buffer_size=16)
854 self.assertEquals(3, bufio.write(b"abc"))
855 bufio.flush()
856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
857 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
858 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
859 bufio.__init__(rawio)
860 self.assertEquals(3, bufio.write(b"ghi"))
861 bufio.flush()
862 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000863
Antoine Pitrou19690592009-06-12 20:14:08 +0000864 def test_detach_flush(self):
865 raw = self.MockRawIO()
866 buf = self.tp(raw)
867 buf.write(b"howdy!")
868 self.assertFalse(raw._write_stack)
869 buf.detach()
870 self.assertEqual(raw._write_stack, [b"howdy!"])
871
872 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000873 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000874 writer = self.MockRawIO()
875 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000876 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000877 self.assertFalse(writer._write_stack)
878
Antoine Pitrou19690592009-06-12 20:14:08 +0000879 def test_write_overflow(self):
880 writer = self.MockRawIO()
881 bufio = self.tp(writer, 8)
882 contents = b"abcdefghijklmnop"
883 for n in range(0, len(contents), 3):
884 bufio.write(contents[n:n+3])
885 flushed = b"".join(writer._write_stack)
886 # At least (total - 8) bytes were implicitly flushed, perhaps more
887 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000888 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000889
Antoine Pitrou19690592009-06-12 20:14:08 +0000890 def check_writes(self, intermediate_func):
891 # Lots of writes, test the flushed output is as expected.
892 contents = bytes(range(256)) * 1000
893 n = 0
894 writer = self.MockRawIO()
895 bufio = self.tp(writer, 13)
896 # Generator of write sizes: repeat each N 15 times then proceed to N+1
897 def gen_sizes():
898 for size in count(1):
899 for i in range(15):
900 yield size
901 sizes = gen_sizes()
902 while n < len(contents):
903 size = min(next(sizes), len(contents) - n)
904 self.assertEquals(bufio.write(contents[n:n+size]), size)
905 intermediate_func(bufio)
906 n += size
907 bufio.flush()
908 self.assertEquals(contents,
909 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000910
Antoine Pitrou19690592009-06-12 20:14:08 +0000911 def test_writes(self):
912 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000913
Antoine Pitrou19690592009-06-12 20:14:08 +0000914 def test_writes_and_flushes(self):
915 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000916
Antoine Pitrou19690592009-06-12 20:14:08 +0000917 def test_writes_and_seeks(self):
918 def _seekabs(bufio):
919 pos = bufio.tell()
920 bufio.seek(pos + 1, 0)
921 bufio.seek(pos - 1, 0)
922 bufio.seek(pos, 0)
923 self.check_writes(_seekabs)
924 def _seekrel(bufio):
925 pos = bufio.seek(0, 1)
926 bufio.seek(+1, 1)
927 bufio.seek(-1, 1)
928 bufio.seek(pos, 0)
929 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000930
Antoine Pitrou19690592009-06-12 20:14:08 +0000931 def test_writes_and_truncates(self):
932 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000933
Antoine Pitrou19690592009-06-12 20:14:08 +0000934 def test_write_non_blocking(self):
935 raw = self.MockNonBlockWriterIO()
936 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000937
Antoine Pitrou19690592009-06-12 20:14:08 +0000938 self.assertEquals(bufio.write(b"abcd"), 4)
939 self.assertEquals(bufio.write(b"efghi"), 5)
940 # 1 byte will be written, the rest will be buffered
941 raw.block_on(b"k")
942 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000943
Antoine Pitrou19690592009-06-12 20:14:08 +0000944 # 8 bytes will be written, 8 will be buffered and the rest will be lost
945 raw.block_on(b"0")
946 try:
947 bufio.write(b"opqrwxyz0123456789")
948 except self.BlockingIOError as e:
949 written = e.characters_written
950 else:
951 self.fail("BlockingIOError should have been raised")
952 self.assertEquals(written, 16)
953 self.assertEquals(raw.pop_written(),
954 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000955
Antoine Pitrou19690592009-06-12 20:14:08 +0000956 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
957 s = raw.pop_written()
958 # Previously buffered bytes were flushed
959 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000960
Antoine Pitrou19690592009-06-12 20:14:08 +0000961 def test_write_and_rewind(self):
962 raw = io.BytesIO()
963 bufio = self.tp(raw, 4)
964 self.assertEqual(bufio.write(b"abcdef"), 6)
965 self.assertEqual(bufio.tell(), 6)
966 bufio.seek(0, 0)
967 self.assertEqual(bufio.write(b"XY"), 2)
968 bufio.seek(6, 0)
969 self.assertEqual(raw.getvalue(), b"XYcdef")
970 self.assertEqual(bufio.write(b"123456"), 6)
971 bufio.flush()
972 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000973
Antoine Pitrou19690592009-06-12 20:14:08 +0000974 def test_flush(self):
975 writer = self.MockRawIO()
976 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000977 bufio.write(b"abc")
978 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000979 self.assertEquals(b"abc", writer._write_stack[0])
980
Antoine Pitrou19690592009-06-12 20:14:08 +0000981 def test_destructor(self):
982 writer = self.MockRawIO()
983 bufio = self.tp(writer, 8)
984 bufio.write(b"abc")
985 del bufio
986 support.gc_collect()
987 self.assertEquals(b"abc", writer._write_stack[0])
988
989 def test_truncate(self):
990 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000991 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000992 bufio = self.tp(raw, 8)
993 bufio.write(b"abcdef")
994 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000995 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000996 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000997 self.assertEqual(f.read(), b"abc")
998
999 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001000 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 # Write out many bytes from many threads and test they were
1002 # all flushed.
1003 N = 1000
1004 contents = bytes(range(256)) * N
1005 sizes = cycle([1, 19])
1006 n = 0
1007 queue = deque()
1008 while n < len(contents):
1009 size = next(sizes)
1010 queue.append(contents[n:n+size])
1011 n += size
1012 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001013 # We use a real file object because it allows us to
1014 # exercise situations where the GIL is released before
1015 # writing the buffer to the raw streams. This is in addition
1016 # to concurrency issues due to switching threads in the middle
1017 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001018 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001019 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001020 errors = []
1021 def f():
1022 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001023 while True:
1024 try:
1025 s = queue.popleft()
1026 except IndexError:
1027 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001028 bufio.write(s)
1029 except Exception as e:
1030 errors.append(e)
1031 raise
1032 threads = [threading.Thread(target=f) for x in range(20)]
1033 for t in threads:
1034 t.start()
1035 time.sleep(0.02) # yield
1036 for t in threads:
1037 t.join()
1038 self.assertFalse(errors,
1039 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001040 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001041 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001042 s = f.read()
1043 for i in range(256):
1044 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001045 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001046 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001047
Antoine Pitrou19690592009-06-12 20:14:08 +00001048 def test_misbehaved_io(self):
1049 rawio = self.MisbehavedRawIO()
1050 bufio = self.tp(rawio, 5)
1051 self.assertRaises(IOError, bufio.seek, 0)
1052 self.assertRaises(IOError, bufio.tell)
1053 self.assertRaises(IOError, bufio.write, b"abcdef")
1054
1055 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001056 with support.check_warnings(("max_buffer_size is deprecated",
1057 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001058 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001059
1060
1061class CBufferedWriterTest(BufferedWriterTest):
1062 tp = io.BufferedWriter
1063
1064 def test_constructor(self):
1065 BufferedWriterTest.test_constructor(self)
1066 # The allocation can succeed on 32-bit builds, e.g. with more
1067 # than 2GB RAM and a 64-bit kernel.
1068 if sys.maxsize > 0x7FFFFFFF:
1069 rawio = self.MockRawIO()
1070 bufio = self.tp(rawio)
1071 self.assertRaises((OverflowError, MemoryError, ValueError),
1072 bufio.__init__, rawio, sys.maxsize)
1073
1074 def test_initialization(self):
1075 rawio = self.MockRawIO()
1076 bufio = self.tp(rawio)
1077 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1078 self.assertRaises(ValueError, bufio.write, b"def")
1079 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1080 self.assertRaises(ValueError, bufio.write, b"def")
1081 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1082 self.assertRaises(ValueError, bufio.write, b"def")
1083
1084 def test_garbage_collection(self):
1085 # C BufferedWriter objects are collected, and collecting them flushes
1086 # all data to disk.
1087 # The Python version has __del__, so it ends into gc.garbage instead
1088 rawio = self.FileIO(support.TESTFN, "w+b")
1089 f = self.tp(rawio)
1090 f.write(b"123xxx")
1091 f.x = f
1092 wr = weakref.ref(f)
1093 del f
1094 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001095 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001096 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001097 self.assertEqual(f.read(), b"123xxx")
1098
1099
1100class PyBufferedWriterTest(BufferedWriterTest):
1101 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001102
1103class BufferedRWPairTest(unittest.TestCase):
1104
Antoine Pitrou19690592009-06-12 20:14:08 +00001105 def test_constructor(self):
1106 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001107 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001108
Antoine Pitrou19690592009-06-12 20:14:08 +00001109 def test_detach(self):
1110 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1111 self.assertRaises(self.UnsupportedOperation, pair.detach)
1112
1113 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001114 with support.check_warnings(("max_buffer_size is deprecated",
1115 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001117
1118 def test_constructor_with_not_readable(self):
1119 class NotReadable(MockRawIO):
1120 def readable(self):
1121 return False
1122
1123 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1124
1125 def test_constructor_with_not_writeable(self):
1126 class NotWriteable(MockRawIO):
1127 def writable(self):
1128 return False
1129
1130 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1131
1132 def test_read(self):
1133 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1134
1135 self.assertEqual(pair.read(3), b"abc")
1136 self.assertEqual(pair.read(1), b"d")
1137 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001138 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1139 self.assertEqual(pair.read(None), b"abc")
1140
1141 def test_readlines(self):
1142 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1143 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1144 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1145 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001146
1147 def test_read1(self):
1148 # .read1() is delegated to the underlying reader object, so this test
1149 # can be shallow.
1150 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1151
1152 self.assertEqual(pair.read1(3), b"abc")
1153
1154 def test_readinto(self):
1155 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1156
1157 data = bytearray(5)
1158 self.assertEqual(pair.readinto(data), 5)
1159 self.assertEqual(data, b"abcde")
1160
1161 def test_write(self):
1162 w = self.MockRawIO()
1163 pair = self.tp(self.MockRawIO(), w)
1164
1165 pair.write(b"abc")
1166 pair.flush()
1167 pair.write(b"def")
1168 pair.flush()
1169 self.assertEqual(w._write_stack, [b"abc", b"def"])
1170
1171 def test_peek(self):
1172 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1173
1174 self.assertTrue(pair.peek(3).startswith(b"abc"))
1175 self.assertEqual(pair.read(3), b"abc")
1176
1177 def test_readable(self):
1178 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1179 self.assertTrue(pair.readable())
1180
1181 def test_writeable(self):
1182 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1183 self.assertTrue(pair.writable())
1184
1185 def test_seekable(self):
1186 # BufferedRWPairs are never seekable, even if their readers and writers
1187 # are.
1188 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1189 self.assertFalse(pair.seekable())
1190
1191 # .flush() is delegated to the underlying writer object and has been
1192 # tested in the test_write method.
1193
1194 def test_close_and_closed(self):
1195 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1196 self.assertFalse(pair.closed)
1197 pair.close()
1198 self.assertTrue(pair.closed)
1199
1200 def test_isatty(self):
1201 class SelectableIsAtty(MockRawIO):
1202 def __init__(self, isatty):
1203 MockRawIO.__init__(self)
1204 self._isatty = isatty
1205
1206 def isatty(self):
1207 return self._isatty
1208
1209 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1210 self.assertFalse(pair.isatty())
1211
1212 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1213 self.assertTrue(pair.isatty())
1214
1215 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1216 self.assertTrue(pair.isatty())
1217
1218 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1219 self.assertTrue(pair.isatty())
1220
1221class CBufferedRWPairTest(BufferedRWPairTest):
1222 tp = io.BufferedRWPair
1223
1224class PyBufferedRWPairTest(BufferedRWPairTest):
1225 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001226
1227
Antoine Pitrou19690592009-06-12 20:14:08 +00001228class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1229 read_mode = "rb+"
1230 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001231
Antoine Pitrou19690592009-06-12 20:14:08 +00001232 def test_constructor(self):
1233 BufferedReaderTest.test_constructor(self)
1234 BufferedWriterTest.test_constructor(self)
1235
1236 def test_read_and_write(self):
1237 raw = self.MockRawIO((b"asdf", b"ghjk"))
1238 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001239
1240 self.assertEqual(b"as", rw.read(2))
1241 rw.write(b"ddd")
1242 rw.write(b"eee")
1243 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001244 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001245 self.assertEquals(b"dddeee", raw._write_stack[0])
1246
Antoine Pitrou19690592009-06-12 20:14:08 +00001247 def test_seek_and_tell(self):
1248 raw = self.BytesIO(b"asdfghjkl")
1249 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001250
1251 self.assertEquals(b"as", rw.read(2))
1252 self.assertEquals(2, rw.tell())
1253 rw.seek(0, 0)
1254 self.assertEquals(b"asdf", rw.read(4))
1255
1256 rw.write(b"asdf")
1257 rw.seek(0, 0)
1258 self.assertEquals(b"asdfasdfl", rw.read())
1259 self.assertEquals(9, rw.tell())
1260 rw.seek(-4, 2)
1261 self.assertEquals(5, rw.tell())
1262 rw.seek(2, 1)
1263 self.assertEquals(7, rw.tell())
1264 self.assertEquals(b"fl", rw.read(11))
1265 self.assertRaises(TypeError, rw.seek, 0.0)
1266
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 def check_flush_and_read(self, read_func):
1268 raw = self.BytesIO(b"abcdefghi")
1269 bufio = self.tp(raw)
1270
1271 self.assertEquals(b"ab", read_func(bufio, 2))
1272 bufio.write(b"12")
1273 self.assertEquals(b"ef", read_func(bufio, 2))
1274 self.assertEquals(6, bufio.tell())
1275 bufio.flush()
1276 self.assertEquals(6, bufio.tell())
1277 self.assertEquals(b"ghi", read_func(bufio))
1278 raw.seek(0, 0)
1279 raw.write(b"XYZ")
1280 # flush() resets the read buffer
1281 bufio.flush()
1282 bufio.seek(0, 0)
1283 self.assertEquals(b"XYZ", read_func(bufio, 3))
1284
1285 def test_flush_and_read(self):
1286 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1287
1288 def test_flush_and_readinto(self):
1289 def _readinto(bufio, n=-1):
1290 b = bytearray(n if n >= 0 else 9999)
1291 n = bufio.readinto(b)
1292 return bytes(b[:n])
1293 self.check_flush_and_read(_readinto)
1294
1295 def test_flush_and_peek(self):
1296 def _peek(bufio, n=-1):
1297 # This relies on the fact that the buffer can contain the whole
1298 # raw stream, otherwise peek() can return less.
1299 b = bufio.peek(n)
1300 if n != -1:
1301 b = b[:n]
1302 bufio.seek(len(b), 1)
1303 return b
1304 self.check_flush_and_read(_peek)
1305
1306 def test_flush_and_write(self):
1307 raw = self.BytesIO(b"abcdefghi")
1308 bufio = self.tp(raw)
1309
1310 bufio.write(b"123")
1311 bufio.flush()
1312 bufio.write(b"45")
1313 bufio.flush()
1314 bufio.seek(0, 0)
1315 self.assertEquals(b"12345fghi", raw.getvalue())
1316 self.assertEquals(b"12345fghi", bufio.read())
1317
1318 def test_threads(self):
1319 BufferedReaderTest.test_threads(self)
1320 BufferedWriterTest.test_threads(self)
1321
1322 def test_writes_and_peek(self):
1323 def _peek(bufio):
1324 bufio.peek(1)
1325 self.check_writes(_peek)
1326 def _peek(bufio):
1327 pos = bufio.tell()
1328 bufio.seek(-1, 1)
1329 bufio.peek(1)
1330 bufio.seek(pos, 0)
1331 self.check_writes(_peek)
1332
1333 def test_writes_and_reads(self):
1334 def _read(bufio):
1335 bufio.seek(-1, 1)
1336 bufio.read(1)
1337 self.check_writes(_read)
1338
1339 def test_writes_and_read1s(self):
1340 def _read1(bufio):
1341 bufio.seek(-1, 1)
1342 bufio.read1(1)
1343 self.check_writes(_read1)
1344
1345 def test_writes_and_readintos(self):
1346 def _read(bufio):
1347 bufio.seek(-1, 1)
1348 bufio.readinto(bytearray(1))
1349 self.check_writes(_read)
1350
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001351 def test_write_after_readahead(self):
1352 # Issue #6629: writing after the buffer was filled by readahead should
1353 # first rewind the raw stream.
1354 for overwrite_size in [1, 5]:
1355 raw = self.BytesIO(b"A" * 10)
1356 bufio = self.tp(raw, 4)
1357 # Trigger readahead
1358 self.assertEqual(bufio.read(1), b"A")
1359 self.assertEqual(bufio.tell(), 1)
1360 # Overwriting should rewind the raw stream if it needs so
1361 bufio.write(b"B" * overwrite_size)
1362 self.assertEqual(bufio.tell(), overwrite_size + 1)
1363 # If the write size was smaller than the buffer size, flush() and
1364 # check that rewind happens.
1365 bufio.flush()
1366 self.assertEqual(bufio.tell(), overwrite_size + 1)
1367 s = raw.getvalue()
1368 self.assertEqual(s,
1369 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1370
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001371 def test_truncate_after_read_or_write(self):
1372 raw = self.BytesIO(b"A" * 10)
1373 bufio = self.tp(raw, 100)
1374 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1375 self.assertEqual(bufio.truncate(), 2)
1376 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1377 self.assertEqual(bufio.truncate(), 4)
1378
Antoine Pitrou19690592009-06-12 20:14:08 +00001379 def test_misbehaved_io(self):
1380 BufferedReaderTest.test_misbehaved_io(self)
1381 BufferedWriterTest.test_misbehaved_io(self)
1382
1383class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1384 tp = io.BufferedRandom
1385
1386 def test_constructor(self):
1387 BufferedRandomTest.test_constructor(self)
1388 # The allocation can succeed on 32-bit builds, e.g. with more
1389 # than 2GB RAM and a 64-bit kernel.
1390 if sys.maxsize > 0x7FFFFFFF:
1391 rawio = self.MockRawIO()
1392 bufio = self.tp(rawio)
1393 self.assertRaises((OverflowError, MemoryError, ValueError),
1394 bufio.__init__, rawio, sys.maxsize)
1395
1396 def test_garbage_collection(self):
1397 CBufferedReaderTest.test_garbage_collection(self)
1398 CBufferedWriterTest.test_garbage_collection(self)
1399
1400class PyBufferedRandomTest(BufferedRandomTest):
1401 tp = pyio.BufferedRandom
1402
1403
Christian Heimes1a6387e2008-03-26 12:49:49 +00001404# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1405# properties:
1406# - A single output character can correspond to many bytes of input.
1407# - The number of input bytes to complete the character can be
1408# undetermined until the last input byte is received.
1409# - The number of input bytes can vary depending on previous input.
1410# - A single input byte can correspond to many characters of output.
1411# - The number of output characters can be undetermined until the
1412# last input byte is received.
1413# - The number of output characters can vary depending on previous input.
1414
1415class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1416 """
1417 For testing seek/tell behavior with a stateful, buffering decoder.
1418
1419 Input is a sequence of words. Words may be fixed-length (length set
1420 by input) or variable-length (period-terminated). In variable-length
1421 mode, extra periods are ignored. Possible words are:
1422 - 'i' followed by a number sets the input length, I (maximum 99).
1423 When I is set to 0, words are space-terminated.
1424 - 'o' followed by a number sets the output length, O (maximum 99).
1425 - Any other word is converted into a word followed by a period on
1426 the output. The output word consists of the input word truncated
1427 or padded out with hyphens to make its length equal to O. If O
1428 is 0, the word is output verbatim without truncating or padding.
1429 I and O are initially set to 1. When I changes, any buffered input is
1430 re-scanned according to the new I. EOF also terminates the last word.
1431 """
1432
1433 def __init__(self, errors='strict'):
1434 codecs.IncrementalDecoder.__init__(self, errors)
1435 self.reset()
1436
1437 def __repr__(self):
1438 return '<SID %x>' % id(self)
1439
1440 def reset(self):
1441 self.i = 1
1442 self.o = 1
1443 self.buffer = bytearray()
1444
1445 def getstate(self):
1446 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1447 return bytes(self.buffer), i*100 + o
1448
1449 def setstate(self, state):
1450 buffer, io = state
1451 self.buffer = bytearray(buffer)
1452 i, o = divmod(io, 100)
1453 self.i, self.o = i ^ 1, o ^ 1
1454
1455 def decode(self, input, final=False):
1456 output = ''
1457 for b in input:
1458 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001459 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001460 if self.buffer:
1461 output += self.process_word()
1462 else:
1463 self.buffer.append(b)
1464 else: # fixed-length, terminate after self.i bytes
1465 self.buffer.append(b)
1466 if len(self.buffer) == self.i:
1467 output += self.process_word()
1468 if final and self.buffer: # EOF terminates the last word
1469 output += self.process_word()
1470 return output
1471
1472 def process_word(self):
1473 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001474 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001475 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001476 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001477 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1478 else:
1479 output = self.buffer.decode('ascii')
1480 if len(output) < self.o:
1481 output += '-'*self.o # pad out with hyphens
1482 if self.o:
1483 output = output[:self.o] # truncate to output length
1484 output += '.'
1485 self.buffer = bytearray()
1486 return output
1487
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001488 codecEnabled = False
1489
1490 @classmethod
1491 def lookupTestDecoder(cls, name):
1492 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001493 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001494 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001495 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001496 incrementalencoder=None,
1497 streamreader=None, streamwriter=None,
1498 incrementaldecoder=cls)
1499
1500# Register the previous decoder for testing.
1501# Disabled by default, tests will enable it.
1502codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1503
1504
Christian Heimes1a6387e2008-03-26 12:49:49 +00001505class StatefulIncrementalDecoderTest(unittest.TestCase):
1506 """
1507 Make sure the StatefulIncrementalDecoder actually works.
1508 """
1509
1510 test_cases = [
1511 # I=1, O=1 (fixed-length input == fixed-length output)
1512 (b'abcd', False, 'a.b.c.d.'),
1513 # I=0, O=0 (variable-length input, variable-length output)
1514 (b'oiabcd', True, 'abcd.'),
1515 # I=0, O=0 (should ignore extra periods)
1516 (b'oi...abcd...', True, 'abcd.'),
1517 # I=0, O=6 (variable-length input, fixed-length output)
1518 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1519 # I=2, O=6 (fixed-length input < fixed-length output)
1520 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1521 # I=6, O=3 (fixed-length input > fixed-length output)
1522 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1523 # I=0, then 3; O=29, then 15 (with longer output)
1524 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1525 'a----------------------------.' +
1526 'b----------------------------.' +
1527 'cde--------------------------.' +
1528 'abcdefghijabcde.' +
1529 'a.b------------.' +
1530 '.c.------------.' +
1531 'd.e------------.' +
1532 'k--------------.' +
1533 'l--------------.' +
1534 'm--------------.')
1535 ]
1536
Antoine Pitrou19690592009-06-12 20:14:08 +00001537 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001538 # Try a few one-shot test cases.
1539 for input, eof, output in self.test_cases:
1540 d = StatefulIncrementalDecoder()
1541 self.assertEquals(d.decode(input, eof), output)
1542
1543 # Also test an unfinished decode, followed by forcing EOF.
1544 d = StatefulIncrementalDecoder()
1545 self.assertEquals(d.decode(b'oiabcd'), '')
1546 self.assertEquals(d.decode(b'', 1), 'abcd.')
1547
1548class TextIOWrapperTest(unittest.TestCase):
1549
1550 def setUp(self):
1551 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1552 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001553 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001554
1555 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001556 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001557
Antoine Pitrou19690592009-06-12 20:14:08 +00001558 def test_constructor(self):
1559 r = self.BytesIO(b"\xc3\xa9\n\n")
1560 b = self.BufferedReader(r, 1000)
1561 t = self.TextIOWrapper(b)
1562 t.__init__(b, encoding="latin1", newline="\r\n")
1563 self.assertEquals(t.encoding, "latin1")
1564 self.assertEquals(t.line_buffering, False)
1565 t.__init__(b, encoding="utf8", line_buffering=True)
1566 self.assertEquals(t.encoding, "utf8")
1567 self.assertEquals(t.line_buffering, True)
1568 self.assertEquals("\xe9\n", t.readline())
1569 self.assertRaises(TypeError, t.__init__, b, newline=42)
1570 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1571
1572 def test_detach(self):
1573 r = self.BytesIO()
1574 b = self.BufferedWriter(r)
1575 t = self.TextIOWrapper(b)
1576 self.assertIs(t.detach(), b)
1577
1578 t = self.TextIOWrapper(b, encoding="ascii")
1579 t.write("howdy")
1580 self.assertFalse(r.getvalue())
1581 t.detach()
1582 self.assertEqual(r.getvalue(), b"howdy")
1583 self.assertRaises(ValueError, t.detach)
1584
1585 def test_repr(self):
1586 raw = self.BytesIO("hello".encode("utf-8"))
1587 b = self.BufferedReader(raw)
1588 t = self.TextIOWrapper(b, encoding="utf-8")
1589 modname = self.TextIOWrapper.__module__
1590 self.assertEqual(repr(t),
1591 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1592 raw.name = "dummy"
1593 self.assertEqual(repr(t),
1594 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1595 raw.name = b"dummy"
1596 self.assertEqual(repr(t),
1597 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1598
1599 def test_line_buffering(self):
1600 r = self.BytesIO()
1601 b = self.BufferedWriter(r, 1000)
1602 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1603 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001604 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001605 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001606 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001607 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001608 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1609
Antoine Pitrou19690592009-06-12 20:14:08 +00001610 def test_encoding(self):
1611 # Check the encoding attribute is always set, and valid
1612 b = self.BytesIO()
1613 t = self.TextIOWrapper(b, encoding="utf8")
1614 self.assertEqual(t.encoding, "utf8")
1615 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001616 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 codecs.lookup(t.encoding)
1618
1619 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001620 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001621 b = self.BytesIO(b"abc\n\xff\n")
1622 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001623 self.assertRaises(UnicodeError, t.read)
1624 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001625 b = self.BytesIO(b"abc\n\xff\n")
1626 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001627 self.assertRaises(UnicodeError, t.read)
1628 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001629 b = self.BytesIO(b"abc\n\xff\n")
1630 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001631 self.assertEquals(t.read(), "abc\n\n")
1632 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001633 b = self.BytesIO(b"abc\n\xff\n")
1634 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1635 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001636
Antoine Pitrou19690592009-06-12 20:14:08 +00001637 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001638 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001639 b = self.BytesIO()
1640 t = self.TextIOWrapper(b, encoding="ascii")
1641 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001642 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001643 b = self.BytesIO()
1644 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1645 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001646 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001647 b = self.BytesIO()
1648 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001649 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001650 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001651 t.flush()
1652 self.assertEquals(b.getvalue(), b"abcdef\n")
1653 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001654 b = self.BytesIO()
1655 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001656 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001657 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001658 t.flush()
1659 self.assertEquals(b.getvalue(), b"abc?def\n")
1660
Antoine Pitrou19690592009-06-12 20:14:08 +00001661 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001662 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1663
1664 tests = [
1665 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1666 [ '', input_lines ],
1667 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1668 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1669 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1670 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001671 encodings = (
1672 'utf-8', 'latin-1',
1673 'utf-16', 'utf-16-le', 'utf-16-be',
1674 'utf-32', 'utf-32-le', 'utf-32-be',
1675 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001676
1677 # Try a range of buffer sizes to test the case where \r is the last
1678 # character in TextIOWrapper._pending_line.
1679 for encoding in encodings:
1680 # XXX: str.encode() should return bytes
1681 data = bytes(''.join(input_lines).encode(encoding))
1682 for do_reads in (False, True):
1683 for bufsize in range(1, 10):
1684 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001685 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1686 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001687 encoding=encoding)
1688 if do_reads:
1689 got_lines = []
1690 while True:
1691 c2 = textio.read(2)
1692 if c2 == '':
1693 break
1694 self.assertEquals(len(c2), 2)
1695 got_lines.append(c2 + textio.readline())
1696 else:
1697 got_lines = list(textio)
1698
1699 for got_line, exp_line in zip(got_lines, exp_lines):
1700 self.assertEquals(got_line, exp_line)
1701 self.assertEquals(len(got_lines), len(exp_lines))
1702
Antoine Pitrou19690592009-06-12 20:14:08 +00001703 def test_newlines_input(self):
1704 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001705 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1706 for newline, expected in [
1707 (None, normalized.decode("ascii").splitlines(True)),
1708 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001709 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1710 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1711 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001712 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001713 buf = self.BytesIO(testdata)
1714 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001715 self.assertEquals(txt.readlines(), expected)
1716 txt.seek(0)
1717 self.assertEquals(txt.read(), "".join(expected))
1718
Antoine Pitrou19690592009-06-12 20:14:08 +00001719 def test_newlines_output(self):
1720 testdict = {
1721 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1722 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1723 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1724 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1725 }
1726 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1727 for newline, expected in tests:
1728 buf = self.BytesIO()
1729 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1730 txt.write("AAA\nB")
1731 txt.write("BB\nCCC\n")
1732 txt.write("X\rY\r\nZ")
1733 txt.flush()
1734 self.assertEquals(buf.closed, False)
1735 self.assertEquals(buf.getvalue(), expected)
1736
1737 def test_destructor(self):
1738 l = []
1739 base = self.BytesIO
1740 class MyBytesIO(base):
1741 def close(self):
1742 l.append(self.getvalue())
1743 base.close(self)
1744 b = MyBytesIO()
1745 t = self.TextIOWrapper(b, encoding="ascii")
1746 t.write("abc")
1747 del t
1748 support.gc_collect()
1749 self.assertEquals([b"abc"], l)
1750
1751 def test_override_destructor(self):
1752 record = []
1753 class MyTextIO(self.TextIOWrapper):
1754 def __del__(self):
1755 record.append(1)
1756 try:
1757 f = super(MyTextIO, self).__del__
1758 except AttributeError:
1759 pass
1760 else:
1761 f()
1762 def close(self):
1763 record.append(2)
1764 super(MyTextIO, self).close()
1765 def flush(self):
1766 record.append(3)
1767 super(MyTextIO, self).flush()
1768 b = self.BytesIO()
1769 t = MyTextIO(b, encoding="ascii")
1770 del t
1771 support.gc_collect()
1772 self.assertEqual(record, [1, 2, 3])
1773
1774 def test_error_through_destructor(self):
1775 # Test that the exception state is not modified by a destructor,
1776 # even if close() fails.
1777 rawio = self.CloseFailureIO()
1778 def f():
1779 self.TextIOWrapper(rawio).xyzzy
1780 with support.captured_output("stderr") as s:
1781 self.assertRaises(AttributeError, f)
1782 s = s.getvalue().strip()
1783 if s:
1784 # The destructor *may* have printed an unraisable error, check it
1785 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001786 self.assertTrue(s.startswith("Exception IOError: "), s)
1787 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001788
1789 # Systematic tests of the text I/O API
1790
Antoine Pitrou19690592009-06-12 20:14:08 +00001791 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001792 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1793 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001794 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001795 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001796 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001797 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001798 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001799 f._CHUNK_SIZE = chunksize
1800 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001801 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001802 cookie = f.tell()
1803 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001804 self.assertEquals(f.read(None), "abc")
1805 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001806 self.assertEquals(f.read(2), "ab")
1807 self.assertEquals(f.read(1), "c")
1808 self.assertEquals(f.read(1), "")
1809 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001810 self.assertEquals(f.tell(), cookie)
1811 self.assertEquals(f.seek(0), 0)
1812 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001813 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001814 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001815 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001816 if enc.startswith("utf"):
1817 self.multi_line_test(f, enc)
1818 f.close()
1819
1820 def multi_line_test(self, f, enc):
1821 f.seek(0)
1822 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001823 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001824 wlines = []
1825 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1826 chars = []
1827 for i in range(size):
1828 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001829 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001830 wlines.append((f.tell(), line))
1831 f.write(line)
1832 f.seek(0)
1833 rlines = []
1834 while True:
1835 pos = f.tell()
1836 line = f.readline()
1837 if not line:
1838 break
1839 rlines.append((pos, line))
1840 self.assertEquals(rlines, wlines)
1841
Antoine Pitrou19690592009-06-12 20:14:08 +00001842 def test_telling(self):
1843 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001844 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001845 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001846 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001847 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001848 p2 = f.tell()
1849 f.seek(0)
1850 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001851 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001852 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001853 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001854 self.assertEquals(f.tell(), p2)
1855 f.seek(0)
1856 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001857 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001858 self.assertRaises(IOError, f.tell)
1859 self.assertEquals(f.tell(), p2)
1860 f.close()
1861
Antoine Pitrou19690592009-06-12 20:14:08 +00001862 def test_seeking(self):
1863 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001864 prefix_size = chunk_size - 2
1865 u_prefix = "a" * prefix_size
1866 prefix = bytes(u_prefix.encode("utf-8"))
1867 self.assertEquals(len(u_prefix), len(prefix))
1868 u_suffix = "\u8888\n"
1869 suffix = bytes(u_suffix.encode("utf-8"))
1870 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001871 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001872 f.write(line*2)
1873 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001875 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001876 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001877 self.assertEquals(f.tell(), prefix_size)
1878 self.assertEquals(f.readline(), u_suffix)
1879
Antoine Pitrou19690592009-06-12 20:14:08 +00001880 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001881 # Regression test for a specific bug
1882 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001883 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001884 f.write(data)
1885 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001886 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887 f._CHUNK_SIZE # Just test that it exists
1888 f._CHUNK_SIZE = 2
1889 f.readline()
1890 f.tell()
1891
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 def test_seek_and_tell(self):
1893 #Test seek/tell using the StatefulIncrementalDecoder.
1894 # Make test faster by doing smaller seeks
1895 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896
Antoine Pitrou19690592009-06-12 20:14:08 +00001897 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001898 """Tell/seek to various points within a data stream and ensure
1899 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001900 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001901 f.write(data)
1902 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001903 f = self.open(support.TESTFN, encoding='test_decoder')
1904 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001905 decoded = f.read()
1906 f.close()
1907
1908 for i in range(min_pos, len(decoded) + 1): # seek positions
1909 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001910 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001911 self.assertEquals(f.read(i), decoded[:i])
1912 cookie = f.tell()
1913 self.assertEquals(f.read(j), decoded[i:i + j])
1914 f.seek(cookie)
1915 self.assertEquals(f.read(), decoded[i:])
1916 f.close()
1917
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001918 # Enable the test decoder.
1919 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001920
1921 # Run the tests.
1922 try:
1923 # Try each test case.
1924 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001925 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001926
1927 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001928 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1929 offset = CHUNK_SIZE - len(input)//2
1930 prefix = b'.'*offset
1931 # Don't bother seeking into the prefix (takes too long).
1932 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001933 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001934
1935 # Ensure our test decoder won't interfere with subsequent tests.
1936 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001937 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938
Antoine Pitrou19690592009-06-12 20:14:08 +00001939 def test_encoded_writes(self):
1940 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001941 tests = ("utf-16",
1942 "utf-16-le",
1943 "utf-16-be",
1944 "utf-32",
1945 "utf-32-le",
1946 "utf-32-be")
1947 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001948 buf = self.BytesIO()
1949 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001950 # Check if the BOM is written only once (see issue1753).
1951 f.write(data)
1952 f.write(data)
1953 f.seek(0)
1954 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001955 f.seek(0)
1956 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001957 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1958
Antoine Pitrou19690592009-06-12 20:14:08 +00001959 def test_unreadable(self):
1960 class UnReadable(self.BytesIO):
1961 def readable(self):
1962 return False
1963 txt = self.TextIOWrapper(UnReadable())
1964 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001965
Antoine Pitrou19690592009-06-12 20:14:08 +00001966 def test_read_one_by_one(self):
1967 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001968 reads = ""
1969 while True:
1970 c = txt.read(1)
1971 if not c:
1972 break
1973 reads += c
1974 self.assertEquals(reads, "AA\nBB")
1975
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001976 def test_readlines(self):
1977 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1978 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1979 txt.seek(0)
1980 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1981 txt.seek(0)
1982 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1983
Christian Heimes1a6387e2008-03-26 12:49:49 +00001984 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001985 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001986 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001987 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001988 reads = ""
1989 while True:
1990 c = txt.read(128)
1991 if not c:
1992 break
1993 reads += c
1994 self.assertEquals(reads, "A"*127+"\nB")
1995
1996 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998
1999 # read one char at a time
2000 reads = ""
2001 while True:
2002 c = txt.read(1)
2003 if not c:
2004 break
2005 reads += c
2006 self.assertEquals(reads, self.normalized)
2007
2008 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002009 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002010 txt._CHUNK_SIZE = 4
2011
2012 reads = ""
2013 while True:
2014 c = txt.read(4)
2015 if not c:
2016 break
2017 reads += c
2018 self.assertEquals(reads, self.normalized)
2019
2020 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002021 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002022 txt._CHUNK_SIZE = 4
2023
2024 reads = txt.read(4)
2025 reads += txt.read(4)
2026 reads += txt.readline()
2027 reads += txt.readline()
2028 reads += txt.readline()
2029 self.assertEquals(reads, self.normalized)
2030
2031 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033 txt._CHUNK_SIZE = 4
2034
2035 reads = txt.read(4)
2036 reads += txt.read()
2037 self.assertEquals(reads, self.normalized)
2038
2039 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002040 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002041 txt._CHUNK_SIZE = 4
2042
2043 reads = txt.read(4)
2044 pos = txt.tell()
2045 txt.seek(0)
2046 txt.seek(pos)
2047 self.assertEquals(txt.read(4), "BBB\n")
2048
2049 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002050 buffer = self.BytesIO(self.testdata)
2051 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002052
2053 self.assertEqual(buffer.seekable(), txt.seekable())
2054
Antoine Pitrou19690592009-06-12 20:14:08 +00002055 @unittest.skip("Issue #6213 with incremental encoders")
2056 def test_append_bom(self):
2057 # The BOM is not written again when appending to a non-empty file
2058 filename = support.TESTFN
2059 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2060 with self.open(filename, 'w', encoding=charset) as f:
2061 f.write('aaa')
2062 pos = f.tell()
2063 with self.open(filename, 'rb') as f:
2064 self.assertEquals(f.read(), 'aaa'.encode(charset))
2065
2066 with self.open(filename, 'a', encoding=charset) as f:
2067 f.write('xxx')
2068 with self.open(filename, 'rb') as f:
2069 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2070
2071 @unittest.skip("Issue #6213 with incremental encoders")
2072 def test_seek_bom(self):
2073 # Same test, but when seeking manually
2074 filename = support.TESTFN
2075 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2076 with self.open(filename, 'w', encoding=charset) as f:
2077 f.write('aaa')
2078 pos = f.tell()
2079 with self.open(filename, 'r+', encoding=charset) as f:
2080 f.seek(pos)
2081 f.write('zzz')
2082 f.seek(0)
2083 f.write('bbb')
2084 with self.open(filename, 'rb') as f:
2085 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2086
2087 def test_errors_property(self):
2088 with self.open(support.TESTFN, "w") as f:
2089 self.assertEqual(f.errors, "strict")
2090 with self.open(support.TESTFN, "w", errors="replace") as f:
2091 self.assertEqual(f.errors, "replace")
2092
2093
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002094 def test_threads_write(self):
2095 # Issue6750: concurrent writes could duplicate data
2096 event = threading.Event()
2097 with self.open(support.TESTFN, "w", buffering=1) as f:
2098 def run(n):
2099 text = "Thread%03d\n" % n
2100 event.wait()
2101 f.write(text)
2102 threads = [threading.Thread(target=lambda n=x: run(n))
2103 for x in range(20)]
2104 for t in threads:
2105 t.start()
2106 time.sleep(0.02)
2107 event.set()
2108 for t in threads:
2109 t.join()
2110 with self.open(support.TESTFN) as f:
2111 content = f.read()
2112 for n in range(20):
2113 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2114
Antoine Pitrou19690592009-06-12 20:14:08 +00002115class CTextIOWrapperTest(TextIOWrapperTest):
2116
2117 def test_initialization(self):
2118 r = self.BytesIO(b"\xc3\xa9\n\n")
2119 b = self.BufferedReader(r, 1000)
2120 t = self.TextIOWrapper(b)
2121 self.assertRaises(TypeError, t.__init__, b, newline=42)
2122 self.assertRaises(ValueError, t.read)
2123 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2124 self.assertRaises(ValueError, t.read)
2125
2126 def test_garbage_collection(self):
2127 # C TextIOWrapper objects are collected, and collecting them flushes
2128 # all data to disk.
2129 # The Python version has __del__, so it ends in gc.garbage instead.
2130 rawio = io.FileIO(support.TESTFN, "wb")
2131 b = self.BufferedWriter(rawio)
2132 t = self.TextIOWrapper(b, encoding="ascii")
2133 t.write("456def")
2134 t.x = t
2135 wr = weakref.ref(t)
2136 del t
2137 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002138 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002139 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002140 self.assertEqual(f.read(), b"456def")
2141
2142class PyTextIOWrapperTest(TextIOWrapperTest):
2143 pass
2144
2145
2146class IncrementalNewlineDecoderTest(unittest.TestCase):
2147
2148 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002149 # UTF-8 specific tests for a newline decoder
2150 def _check_decode(b, s, **kwargs):
2151 # We exercise getstate() / setstate() as well as decode()
2152 state = decoder.getstate()
2153 self.assertEquals(decoder.decode(b, **kwargs), s)
2154 decoder.setstate(state)
2155 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002156
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002157 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002158
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002159 _check_decode(b'\xe8', "")
2160 _check_decode(b'\xa2', "")
2161 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002162
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002163 _check_decode(b'\xe8', "")
2164 _check_decode(b'\xa2', "")
2165 _check_decode(b'\x88', "\u8888")
2166
2167 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2169
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002170 decoder.reset()
2171 _check_decode(b'\n', "\n")
2172 _check_decode(b'\r', "")
2173 _check_decode(b'', "\n", final=True)
2174 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002175
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002176 _check_decode(b'\r', "")
2177 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002178
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002179 _check_decode(b'\r\r\n', "\n\n")
2180 _check_decode(b'\r', "")
2181 _check_decode(b'\r', "\n")
2182 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002184 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2185 _check_decode(b'\xe8\xa2\x88', "\u8888")
2186 _check_decode(b'\n', "\n")
2187 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2188 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002189
Antoine Pitrou19690592009-06-12 20:14:08 +00002190 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002191 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002192 if encoding is not None:
2193 encoder = codecs.getincrementalencoder(encoding)()
2194 def _decode_bytewise(s):
2195 # Decode one byte at a time
2196 for b in encoder.encode(s):
2197 result.append(decoder.decode(b))
2198 else:
2199 encoder = None
2200 def _decode_bytewise(s):
2201 # Decode one char at a time
2202 for c in s:
2203 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002204 self.assertEquals(decoder.newlines, None)
2205 _decode_bytewise("abc\n\r")
2206 self.assertEquals(decoder.newlines, '\n')
2207 _decode_bytewise("\nabc")
2208 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2209 _decode_bytewise("abc\r")
2210 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2211 _decode_bytewise("abc")
2212 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2213 _decode_bytewise("abc\r")
2214 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2215 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002216 input = "abc"
2217 if encoder is not None:
2218 encoder.reset()
2219 input = encoder.encode(input)
2220 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002221 self.assertEquals(decoder.newlines, None)
2222
2223 def test_newline_decoder(self):
2224 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002225 # None meaning the IncrementalNewlineDecoder takes unicode input
2226 # rather than bytes input
2227 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002228 'utf-16', 'utf-16-le', 'utf-16-be',
2229 'utf-32', 'utf-32-le', 'utf-32-be',
2230 )
2231 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002232 decoder = enc and codecs.getincrementaldecoder(enc)()
2233 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2234 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002235 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002236 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2237 self.check_newline_decoding_utf8(decoder)
2238
2239 def test_newline_bytes(self):
2240 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2241 def _check(dec):
2242 self.assertEquals(dec.newlines, None)
2243 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2244 self.assertEquals(dec.newlines, None)
2245 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2246 self.assertEquals(dec.newlines, None)
2247 dec = self.IncrementalNewlineDecoder(None, translate=False)
2248 _check(dec)
2249 dec = self.IncrementalNewlineDecoder(None, translate=True)
2250 _check(dec)
2251
2252class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2253 pass
2254
2255class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2256 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002257
Christian Heimes1a6387e2008-03-26 12:49:49 +00002258
2259# XXX Tests for open()
2260
2261class MiscIOTest(unittest.TestCase):
2262
Benjamin Petersonad100c32008-11-20 22:06:22 +00002263 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002264 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002265
Antoine Pitrou19690592009-06-12 20:14:08 +00002266 def test___all__(self):
2267 for name in self.io.__all__:
2268 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002269 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002270 if name == "open":
2271 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002272 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002273 self.assertTrue(issubclass(obj, Exception), name)
2274 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002275 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002276
Benjamin Petersonad100c32008-11-20 22:06:22 +00002277 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002278 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002279 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002280 f.close()
2281
Antoine Pitrou19690592009-06-12 20:14:08 +00002282 f = self.open(support.TESTFN, "U")
2283 self.assertEquals(f.name, support.TESTFN)
2284 self.assertEquals(f.buffer.name, support.TESTFN)
2285 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002286 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002287 self.assertEquals(f.buffer.mode, "rb")
2288 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002289 f.close()
2290
Antoine Pitrou19690592009-06-12 20:14:08 +00002291 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002292 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002293 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2294 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002295
Antoine Pitrou19690592009-06-12 20:14:08 +00002296 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002297 self.assertEquals(g.mode, "wb")
2298 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002299 self.assertEquals(g.name, f.fileno())
2300 self.assertEquals(g.raw.name, f.fileno())
2301 f.close()
2302 g.close()
2303
Antoine Pitrou19690592009-06-12 20:14:08 +00002304 def test_io_after_close(self):
2305 for kwargs in [
2306 {"mode": "w"},
2307 {"mode": "wb"},
2308 {"mode": "w", "buffering": 1},
2309 {"mode": "w", "buffering": 2},
2310 {"mode": "wb", "buffering": 0},
2311 {"mode": "r"},
2312 {"mode": "rb"},
2313 {"mode": "r", "buffering": 1},
2314 {"mode": "r", "buffering": 2},
2315 {"mode": "rb", "buffering": 0},
2316 {"mode": "w+"},
2317 {"mode": "w+b"},
2318 {"mode": "w+", "buffering": 1},
2319 {"mode": "w+", "buffering": 2},
2320 {"mode": "w+b", "buffering": 0},
2321 ]:
2322 f = self.open(support.TESTFN, **kwargs)
2323 f.close()
2324 self.assertRaises(ValueError, f.flush)
2325 self.assertRaises(ValueError, f.fileno)
2326 self.assertRaises(ValueError, f.isatty)
2327 self.assertRaises(ValueError, f.__iter__)
2328 if hasattr(f, "peek"):
2329 self.assertRaises(ValueError, f.peek, 1)
2330 self.assertRaises(ValueError, f.read)
2331 if hasattr(f, "read1"):
2332 self.assertRaises(ValueError, f.read1, 1024)
2333 if hasattr(f, "readinto"):
2334 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2335 self.assertRaises(ValueError, f.readline)
2336 self.assertRaises(ValueError, f.readlines)
2337 self.assertRaises(ValueError, f.seek, 0)
2338 self.assertRaises(ValueError, f.tell)
2339 self.assertRaises(ValueError, f.truncate)
2340 self.assertRaises(ValueError, f.write,
2341 b"" if "b" in kwargs['mode'] else "")
2342 self.assertRaises(ValueError, f.writelines, [])
2343 self.assertRaises(ValueError, next, f)
2344
2345 def test_blockingioerror(self):
2346 # Various BlockingIOError issues
2347 self.assertRaises(TypeError, self.BlockingIOError)
2348 self.assertRaises(TypeError, self.BlockingIOError, 1)
2349 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2350 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2351 b = self.BlockingIOError(1, "")
2352 self.assertEqual(b.characters_written, 0)
2353 class C(unicode):
2354 pass
2355 c = C("")
2356 b = self.BlockingIOError(1, c)
2357 c.b = b
2358 b.c = c
2359 wr = weakref.ref(c)
2360 del c, b
2361 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002362 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002363
2364 def test_abcs(self):
2365 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002366 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2367 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2368 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2369 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002370
2371 def _check_abc_inheritance(self, abcmodule):
2372 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002373 self.assertIsInstance(f, abcmodule.IOBase)
2374 self.assertIsInstance(f, abcmodule.RawIOBase)
2375 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2376 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002377 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002378 self.assertIsInstance(f, abcmodule.IOBase)
2379 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2380 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2381 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002382 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002383 self.assertIsInstance(f, abcmodule.IOBase)
2384 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2385 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2386 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002387
2388 def test_abc_inheritance(self):
2389 # Test implementations inherit from their respective ABCs
2390 self._check_abc_inheritance(self)
2391
2392 def test_abc_inheritance_official(self):
2393 # Test implementations inherit from the official ABCs of the
2394 # baseline "io" module.
2395 self._check_abc_inheritance(io)
2396
2397class CMiscIOTest(MiscIOTest):
2398 io = io
2399
2400class PyMiscIOTest(MiscIOTest):
2401 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002402
Christian Heimes1a6387e2008-03-26 12:49:49 +00002403def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002404 tests = (CIOTest, PyIOTest,
2405 CBufferedReaderTest, PyBufferedReaderTest,
2406 CBufferedWriterTest, PyBufferedWriterTest,
2407 CBufferedRWPairTest, PyBufferedRWPairTest,
2408 CBufferedRandomTest, PyBufferedRandomTest,
2409 StatefulIncrementalDecoderTest,
2410 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2411 CTextIOWrapperTest, PyTextIOWrapperTest,
2412 CMiscIOTest, PyMiscIOTest,
2413 )
2414
2415 # Put the namespaces of the IO module we are testing and some useful mock
2416 # classes in the __dict__ of each test.
2417 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2418 MockNonBlockWriterIO)
2419 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2420 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2421 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2422 globs = globals()
2423 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2424 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2425 # Avoid turning open into a bound method.
2426 py_io_ns["open"] = pyio.OpenWrapper
2427 for test in tests:
2428 if test.__name__.startswith("C"):
2429 for name, obj in c_io_ns.items():
2430 setattr(test, name, obj)
2431 elif test.__name__.startswith("Py"):
2432 for name, obj in py_io_ns.items():
2433 setattr(test, name, obj)
2434
2435 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002436
2437if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002438 test_main()