blob: b91c0a128e8842b3153a021f0b1df1db6eed67f8 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import threading
30import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000031import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000032import warnings
33import weakref
34import gc
35import abc
36from itertools import chain, cycle, count
37from collections import deque
38from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000039
40import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
43
44__metaclass__ = type
45bytes = support.py3k_bytes
46
47def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with io.open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000051
52
Antoine Pitrou19690592009-06-12 20:14:08 +000053class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000054
55 def __init__(self, read_stack=()):
56 self._read_stack = list(read_stack)
57 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000058 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000059
60 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000061 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000062 try:
63 return self._read_stack.pop(0)
64 except:
65 return b""
66
67 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000068 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000069 return len(b)
70
71 def writable(self):
72 return True
73
74 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
80 def seekable(self):
81 return True
82
83 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000084 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000085
86 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
95 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
112class CMockRawIO(MockRawIO, io.RawIOBase):
113 pass
114
115class PyMockRawIO(MockRawIO, pyio.RawIOBase):
116 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000117
118
Antoine Pitrou19690592009-06-12 20:14:08 +0000119class MisbehavedRawIO(MockRawIO):
120 def write(self, b):
121 return MockRawIO.write(self, b) * 2
122
123 def read(self, n=None):
124 return MockRawIO.read(self, n) * 2
125
126 def seek(self, pos, whence):
127 return -123
128
129 def tell(self):
130 return -456
131
132 def readinto(self, buf):
133 MockRawIO.readinto(self, buf)
134 return len(buf) * 5
135
136class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
137 pass
138
139class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
140 pass
141
142
143class CloseFailureIO(MockRawIO):
144 closed = 0
145
146 def close(self):
147 if not self.closed:
148 self.closed = 1
149 raise IOError
150
151class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
152 pass
153
154class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
155 pass
156
157
158class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000159
160 def __init__(self, data):
161 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000162 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000163
164 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000165 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000166 self.read_history.append(None if res is None else len(res))
167 return res
168
Antoine Pitrou19690592009-06-12 20:14:08 +0000169 def readinto(self, b):
170 res = super(MockFileIO, self).readinto(b)
171 self.read_history.append(res)
172 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000173
Antoine Pitrou19690592009-06-12 20:14:08 +0000174class CMockFileIO(MockFileIO, io.BytesIO):
175 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000176
Antoine Pitrou19690592009-06-12 20:14:08 +0000177class PyMockFileIO(MockFileIO, pyio.BytesIO):
178 pass
179
180
181class MockNonBlockWriterIO:
182
183 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000184 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000185 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000186
Antoine Pitrou19690592009-06-12 20:14:08 +0000187 def pop_written(self):
188 s = b"".join(self._write_stack)
189 self._write_stack[:] = []
190 return s
191
192 def block_on(self, char):
193 """Block when a given char is encountered."""
194 self._blocker_char = char
195
196 def readable(self):
197 return True
198
199 def seekable(self):
200 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000201
202 def writable(self):
203 return True
204
Antoine Pitrou19690592009-06-12 20:14:08 +0000205 def write(self, b):
206 b = bytes(b)
207 n = -1
208 if self._blocker_char:
209 try:
210 n = b.index(self._blocker_char)
211 except ValueError:
212 pass
213 else:
214 self._blocker_char = None
215 self._write_stack.append(b[:n])
216 raise self.BlockingIOError(0, "test blocking", n)
217 self._write_stack.append(b)
218 return len(b)
219
220class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
221 BlockingIOError = io.BlockingIOError
222
223class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
224 BlockingIOError = pyio.BlockingIOError
225
Christian Heimes1a6387e2008-03-26 12:49:49 +0000226
227class IOTest(unittest.TestCase):
228
Antoine Pitrou19690592009-06-12 20:14:08 +0000229 def setUp(self):
230 support.unlink(support.TESTFN)
231
Christian Heimes1a6387e2008-03-26 12:49:49 +0000232 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000233 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000234
235 def write_ops(self, f):
236 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000237 f.truncate(0)
238 self.assertEqual(f.tell(), 5)
239 f.seek(0)
240
241 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000242 self.assertEqual(f.seek(0), 0)
243 self.assertEqual(f.write(b"Hello."), 6)
244 self.assertEqual(f.tell(), 6)
245 self.assertEqual(f.seek(-1, 1), 5)
246 self.assertEqual(f.tell(), 5)
247 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
248 self.assertEqual(f.seek(0), 0)
249 self.assertEqual(f.write(b"h"), 1)
250 self.assertEqual(f.seek(-1, 2), 13)
251 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000252
Christian Heimes1a6387e2008-03-26 12:49:49 +0000253 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000254 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000255 self.assertRaises(TypeError, f.seek, 0.0)
256
257 def read_ops(self, f, buffered=False):
258 data = f.read(5)
259 self.assertEqual(data, b"hello")
260 data = bytearray(data)
261 self.assertEqual(f.readinto(data), 5)
262 self.assertEqual(data, b" worl")
263 self.assertEqual(f.readinto(data), 2)
264 self.assertEqual(len(data), 5)
265 self.assertEqual(data[:2], b"d\n")
266 self.assertEqual(f.seek(0), 0)
267 self.assertEqual(f.read(20), b"hello world\n")
268 self.assertEqual(f.read(1), b"")
269 self.assertEqual(f.readinto(bytearray(b"x")), 0)
270 self.assertEqual(f.seek(-6, 2), 6)
271 self.assertEqual(f.read(5), b"world")
272 self.assertEqual(f.read(0), b"")
273 self.assertEqual(f.readinto(bytearray()), 0)
274 self.assertEqual(f.seek(-6, 1), 5)
275 self.assertEqual(f.read(5), b" worl")
276 self.assertEqual(f.tell(), 10)
277 self.assertRaises(TypeError, f.seek, 0.0)
278 if buffered:
279 f.seek(0)
280 self.assertEqual(f.read(), b"hello world\n")
281 f.seek(6)
282 self.assertEqual(f.read(), b"world\n")
283 self.assertEqual(f.read(), b"")
284
285 LARGE = 2**31
286
287 def large_file_ops(self, f):
288 assert f.readable()
289 assert f.writable()
290 self.assertEqual(f.seek(self.LARGE), self.LARGE)
291 self.assertEqual(f.tell(), self.LARGE)
292 self.assertEqual(f.write(b"xxx"), 3)
293 self.assertEqual(f.tell(), self.LARGE + 3)
294 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
295 self.assertEqual(f.truncate(), self.LARGE + 2)
296 self.assertEqual(f.tell(), self.LARGE + 2)
297 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
298 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000299 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000300 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
301 self.assertEqual(f.seek(-1, 2), self.LARGE)
302 self.assertEqual(f.read(2), b"x")
303
Antoine Pitrou19690592009-06-12 20:14:08 +0000304 def test_invalid_operations(self):
305 # Try writing on a file opened in read mode and vice-versa.
306 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000307 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000308 self.assertRaises(IOError, fp.read)
309 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000310 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000311 self.assertRaises(IOError, fp.write, b"blah")
312 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000313 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000314 self.assertRaises(IOError, fp.write, "blah")
315 self.assertRaises(IOError, fp.writelines, ["blah\n"])
316
Christian Heimes1a6387e2008-03-26 12:49:49 +0000317 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000318 with self.open(support.TESTFN, "wb", buffering=0) as f:
319 self.assertEqual(f.readable(), False)
320 self.assertEqual(f.writable(), True)
321 self.assertEqual(f.seekable(), True)
322 self.write_ops(f)
323 with self.open(support.TESTFN, "rb", buffering=0) as f:
324 self.assertEqual(f.readable(), True)
325 self.assertEqual(f.writable(), False)
326 self.assertEqual(f.seekable(), True)
327 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000328
329 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000330 with self.open(support.TESTFN, "wb") as f:
331 self.assertEqual(f.readable(), False)
332 self.assertEqual(f.writable(), True)
333 self.assertEqual(f.seekable(), True)
334 self.write_ops(f)
335 with self.open(support.TESTFN, "rb") as f:
336 self.assertEqual(f.readable(), True)
337 self.assertEqual(f.writable(), False)
338 self.assertEqual(f.seekable(), True)
339 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000340
341 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000342 with self.open(support.TESTFN, "wb") as f:
343 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
344 with self.open(support.TESTFN, "rb") as f:
345 self.assertEqual(f.readline(), b"abc\n")
346 self.assertEqual(f.readline(10), b"def\n")
347 self.assertEqual(f.readline(2), b"xy")
348 self.assertEqual(f.readline(4), b"zzy\n")
349 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000350 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000351 self.assertRaises(TypeError, f.readline, 5.3)
352 with self.open(support.TESTFN, "r") as f:
353 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000354
355 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000356 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000357 self.write_ops(f)
358 data = f.getvalue()
359 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000360 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000361 self.read_ops(f, True)
362
363 def test_large_file_ops(self):
364 # On Windows and Mac OSX this test comsumes large resources; It takes
365 # a long time to build the >2GB file and takes >2GB of disk space
366 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000367 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
368 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000369 print("\nTesting large file ops skipped on %s." % sys.platform,
370 file=sys.stderr)
371 print("It requires %d bytes and a long time." % self.LARGE,
372 file=sys.stderr)
373 print("Use 'regrtest.py -u largefile test_io' to run it.",
374 file=sys.stderr)
375 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000376 with self.open(support.TESTFN, "w+b", 0) as f:
377 self.large_file_ops(f)
378 with self.open(support.TESTFN, "w+b") as f:
379 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000380
381 def test_with_open(self):
382 for bufsize in (0, 1, 100):
383 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000384 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000385 f.write(b"xxx")
386 self.assertEqual(f.closed, True)
387 f = None
388 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000389 with self.open(support.TESTFN, "wb", bufsize) as f:
Senthil Kumarance8e33a2010-01-08 19:04:16 +0000390 1/0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000391 except ZeroDivisionError:
392 self.assertEqual(f.closed, True)
393 else:
Senthil Kumarance8e33a2010-01-08 19:04:16 +0000394 self.fail("1/0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000395
Antoine Pitroue741cc62009-01-21 00:45:36 +0000396 # issue 5008
397 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000398 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000399 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000400 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000401 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000402 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000403 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000404 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000405 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000406
Christian Heimes1a6387e2008-03-26 12:49:49 +0000407 def test_destructor(self):
408 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000409 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000410 def __del__(self):
411 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000412 try:
413 f = super(MyFileIO, self).__del__
414 except AttributeError:
415 pass
416 else:
417 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000418 def close(self):
419 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000420 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000421 def flush(self):
422 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000423 super(MyFileIO, self).flush()
424 f = MyFileIO(support.TESTFN, "wb")
425 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000426 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000427 support.gc_collect()
428 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000429 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000430 self.assertEqual(f.read(), b"xxx")
431
432 def _check_base_destructor(self, base):
433 record = []
434 class MyIO(base):
435 def __init__(self):
436 # This exercises the availability of attributes on object
437 # destruction.
438 # (in the C version, close() is called by the tp_dealloc
439 # function, not by __del__)
440 self.on_del = 1
441 self.on_close = 2
442 self.on_flush = 3
443 def __del__(self):
444 record.append(self.on_del)
445 try:
446 f = super(MyIO, self).__del__
447 except AttributeError:
448 pass
449 else:
450 f()
451 def close(self):
452 record.append(self.on_close)
453 super(MyIO, self).close()
454 def flush(self):
455 record.append(self.on_flush)
456 super(MyIO, self).flush()
457 f = MyIO()
458 del f
459 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000460 self.assertEqual(record, [1, 2, 3])
461
Antoine Pitrou19690592009-06-12 20:14:08 +0000462 def test_IOBase_destructor(self):
463 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000464
Antoine Pitrou19690592009-06-12 20:14:08 +0000465 def test_RawIOBase_destructor(self):
466 self._check_base_destructor(self.RawIOBase)
467
468 def test_BufferedIOBase_destructor(self):
469 self._check_base_destructor(self.BufferedIOBase)
470
471 def test_TextIOBase_destructor(self):
472 self._check_base_destructor(self.TextIOBase)
473
474 def test_close_flushes(self):
475 with self.open(support.TESTFN, "wb") as f:
476 f.write(b"xxx")
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
479
480 def test_array_writes(self):
481 a = array.array(b'i', range(10))
482 n = len(a.tostring())
483 with self.open(support.TESTFN, "wb", 0) as f:
484 self.assertEqual(f.write(a), n)
485 with self.open(support.TESTFN, "wb") as f:
486 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000487
488 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000489 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000490 closefd=False)
491
Antoine Pitrou19690592009-06-12 20:14:08 +0000492 def test_read_closed(self):
493 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000494 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000495 with self.open(support.TESTFN, "r") as f:
496 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000497 self.assertEqual(file.read(), "egg\n")
498 file.seek(0)
499 file.close()
500 self.assertRaises(ValueError, file.read)
501
502 def test_no_closefd_with_filename(self):
503 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000504 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000505
506 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000507 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000508 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000509 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000510 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000511 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000512 self.assertEqual(file.buffer.raw.closefd, False)
513
Antoine Pitrou19690592009-06-12 20:14:08 +0000514 def test_garbage_collection(self):
515 # FileIO objects are collected, and collecting them flushes
516 # all data to disk.
517 f = self.FileIO(support.TESTFN, "wb")
518 f.write(b"abcxxx")
519 f.f = f
520 wr = weakref.ref(f)
521 del f
522 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000523 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000524 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000525 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000526
Antoine Pitrou19690592009-06-12 20:14:08 +0000527 def test_unbounded_file(self):
528 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
529 zero = "/dev/zero"
530 if not os.path.exists(zero):
531 self.skipTest("{0} does not exist".format(zero))
532 if sys.maxsize > 0x7FFFFFFF:
533 self.skipTest("test can only run in a 32-bit address space")
534 if support.real_max_memuse < support._2G:
535 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000536 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000537 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000538 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000539 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000540 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000541 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000542
Antoine Pitrou19690592009-06-12 20:14:08 +0000543class CIOTest(IOTest):
544 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000545
Antoine Pitrou19690592009-06-12 20:14:08 +0000546class PyIOTest(IOTest):
547 test_array_writes = unittest.skip(
548 "len(array.array) returns number of elements rather than bytelength"
549 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000550
551
Antoine Pitrou19690592009-06-12 20:14:08 +0000552class CommonBufferedTests:
553 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
554
555 def test_detach(self):
556 raw = self.MockRawIO()
557 buf = self.tp(raw)
558 self.assertIs(buf.detach(), raw)
559 self.assertRaises(ValueError, buf.detach)
560
561 def test_fileno(self):
562 rawio = self.MockRawIO()
563 bufio = self.tp(rawio)
564
565 self.assertEquals(42, bufio.fileno())
566
567 def test_no_fileno(self):
568 # XXX will we always have fileno() function? If so, kill
569 # this test. Else, write it.
570 pass
571
572 def test_invalid_args(self):
573 rawio = self.MockRawIO()
574 bufio = self.tp(rawio)
575 # Invalid whence
576 self.assertRaises(ValueError, bufio.seek, 0, -1)
577 self.assertRaises(ValueError, bufio.seek, 0, 3)
578
579 def test_override_destructor(self):
580 tp = self.tp
581 record = []
582 class MyBufferedIO(tp):
583 def __del__(self):
584 record.append(1)
585 try:
586 f = super(MyBufferedIO, self).__del__
587 except AttributeError:
588 pass
589 else:
590 f()
591 def close(self):
592 record.append(2)
593 super(MyBufferedIO, self).close()
594 def flush(self):
595 record.append(3)
596 super(MyBufferedIO, self).flush()
597 rawio = self.MockRawIO()
598 bufio = MyBufferedIO(rawio)
599 writable = bufio.writable()
600 del bufio
601 support.gc_collect()
602 if writable:
603 self.assertEqual(record, [1, 2, 3])
604 else:
605 self.assertEqual(record, [1, 2])
606
607 def test_context_manager(self):
608 # Test usability as a context manager
609 rawio = self.MockRawIO()
610 bufio = self.tp(rawio)
611 def _with():
612 with bufio:
613 pass
614 _with()
615 # bufio should now be closed, and using it a second time should raise
616 # a ValueError.
617 self.assertRaises(ValueError, _with)
618
619 def test_error_through_destructor(self):
620 # Test that the exception state is not modified by a destructor,
621 # even if close() fails.
622 rawio = self.CloseFailureIO()
623 def f():
624 self.tp(rawio).xyzzy
625 with support.captured_output("stderr") as s:
626 self.assertRaises(AttributeError, f)
627 s = s.getvalue().strip()
628 if s:
629 # The destructor *may* have printed an unraisable error, check it
630 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000631 self.assertTrue(s.startswith("Exception IOError: "), s)
632 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000633
634 def test_repr(self):
635 raw = self.MockRawIO()
636 b = self.tp(raw)
637 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
638 self.assertEqual(repr(b), "<%s>" % clsname)
639 raw.name = "dummy"
640 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
641 raw.name = b"dummy"
642 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000643
644
Antoine Pitrou19690592009-06-12 20:14:08 +0000645class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
646 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000647
Antoine Pitrou19690592009-06-12 20:14:08 +0000648 def test_constructor(self):
649 rawio = self.MockRawIO([b"abc"])
650 bufio = self.tp(rawio)
651 bufio.__init__(rawio)
652 bufio.__init__(rawio, buffer_size=1024)
653 bufio.__init__(rawio, buffer_size=16)
654 self.assertEquals(b"abc", bufio.read())
655 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
656 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
657 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
658 rawio = self.MockRawIO([b"abc"])
659 bufio.__init__(rawio)
660 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000661
Antoine Pitrou19690592009-06-12 20:14:08 +0000662 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000663 for arg in (None, 7):
664 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
665 bufio = self.tp(rawio)
666 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000667 # Invalid args
668 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000669
Antoine Pitrou19690592009-06-12 20:14:08 +0000670 def test_read1(self):
671 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
672 bufio = self.tp(rawio)
673 self.assertEquals(b"a", bufio.read(1))
674 self.assertEquals(b"b", bufio.read1(1))
675 self.assertEquals(rawio._reads, 1)
676 self.assertEquals(b"c", bufio.read1(100))
677 self.assertEquals(rawio._reads, 1)
678 self.assertEquals(b"d", bufio.read1(100))
679 self.assertEquals(rawio._reads, 2)
680 self.assertEquals(b"efg", bufio.read1(100))
681 self.assertEquals(rawio._reads, 3)
682 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000683 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000684 # Invalid args
685 self.assertRaises(ValueError, bufio.read1, -1)
686
687 def test_readinto(self):
688 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
689 bufio = self.tp(rawio)
690 b = bytearray(2)
691 self.assertEquals(bufio.readinto(b), 2)
692 self.assertEquals(b, b"ab")
693 self.assertEquals(bufio.readinto(b), 2)
694 self.assertEquals(b, b"cd")
695 self.assertEquals(bufio.readinto(b), 2)
696 self.assertEquals(b, b"ef")
697 self.assertEquals(bufio.readinto(b), 1)
698 self.assertEquals(b, b"gf")
699 self.assertEquals(bufio.readinto(b), 0)
700 self.assertEquals(b, b"gf")
701
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000702 def test_readlines(self):
703 def bufio():
704 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
705 return self.tp(rawio)
706 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
707 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
708 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
709
Antoine Pitrou19690592009-06-12 20:14:08 +0000710 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000711 data = b"abcdefghi"
712 dlen = len(data)
713
714 tests = [
715 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
716 [ 100, [ 3, 3, 3], [ dlen ] ],
717 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
718 ]
719
720 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000721 rawio = self.MockFileIO(data)
722 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000723 pos = 0
724 for nbytes in buf_read_sizes:
725 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
726 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000727 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000728 self.assertEquals(rawio.read_history, raw_read_sizes)
729
Antoine Pitrou19690592009-06-12 20:14:08 +0000730 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000731 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000732 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
733 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000734
735 self.assertEquals(b"abcd", bufio.read(6))
736 self.assertEquals(b"e", bufio.read(1))
737 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000738 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000739 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000740 self.assertEquals(b"", bufio.read())
741
Antoine Pitrou19690592009-06-12 20:14:08 +0000742 def test_read_past_eof(self):
743 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
744 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000745
746 self.assertEquals(b"abcdefg", bufio.read(9000))
747
Antoine Pitrou19690592009-06-12 20:14:08 +0000748 def test_read_all(self):
749 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
750 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000751
752 self.assertEquals(b"abcdefg", bufio.read())
753
Antoine Pitrou19690592009-06-12 20:14:08 +0000754 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000755 try:
756 # Write out many bytes with exactly the same number of 0's,
757 # 1's... 255's. This will help us check that concurrent reading
758 # doesn't duplicate or forget contents.
759 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000760 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000761 random.shuffle(l)
762 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000763 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000764 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000765 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000766 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000767 errors = []
768 results = []
769 def f():
770 try:
771 # Intra-buffer read then buffer-flushing read
772 for n in cycle([1, 19]):
773 s = bufio.read(n)
774 if not s:
775 break
776 # list.append() is atomic
777 results.append(s)
778 except Exception as e:
779 errors.append(e)
780 raise
781 threads = [threading.Thread(target=f) for x in range(20)]
782 for t in threads:
783 t.start()
784 time.sleep(0.02) # yield
785 for t in threads:
786 t.join()
787 self.assertFalse(errors,
788 "the following exceptions were caught: %r" % errors)
789 s = b''.join(results)
790 for i in range(256):
791 c = bytes(bytearray([i]))
792 self.assertEqual(s.count(c), N)
793 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000794 support.unlink(support.TESTFN)
795
796 def test_misbehaved_io(self):
797 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
798 bufio = self.tp(rawio)
799 self.assertRaises(IOError, bufio.seek, 0)
800 self.assertRaises(IOError, bufio.tell)
801
802class CBufferedReaderTest(BufferedReaderTest):
803 tp = io.BufferedReader
804
805 def test_constructor(self):
806 BufferedReaderTest.test_constructor(self)
807 # The allocation can succeed on 32-bit builds, e.g. with more
808 # than 2GB RAM and a 64-bit kernel.
809 if sys.maxsize > 0x7FFFFFFF:
810 rawio = self.MockRawIO()
811 bufio = self.tp(rawio)
812 self.assertRaises((OverflowError, MemoryError, ValueError),
813 bufio.__init__, rawio, sys.maxsize)
814
815 def test_initialization(self):
816 rawio = self.MockRawIO([b"abc"])
817 bufio = self.tp(rawio)
818 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
819 self.assertRaises(ValueError, bufio.read)
820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
821 self.assertRaises(ValueError, bufio.read)
822 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
823 self.assertRaises(ValueError, bufio.read)
824
825 def test_misbehaved_io_read(self):
826 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
827 bufio = self.tp(rawio)
828 # _pyio.BufferedReader seems to implement reading different, so that
829 # checking this is not so easy.
830 self.assertRaises(IOError, bufio.read, 10)
831
832 def test_garbage_collection(self):
833 # C BufferedReader objects are collected.
834 # The Python version has __del__, so it ends into gc.garbage instead
835 rawio = self.FileIO(support.TESTFN, "w+b")
836 f = self.tp(rawio)
837 f.f = f
838 wr = weakref.ref(f)
839 del f
840 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000841 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000842
843class PyBufferedReaderTest(BufferedReaderTest):
844 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000845
846
Antoine Pitrou19690592009-06-12 20:14:08 +0000847class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
848 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000849
Antoine Pitrou19690592009-06-12 20:14:08 +0000850 def test_constructor(self):
851 rawio = self.MockRawIO()
852 bufio = self.tp(rawio)
853 bufio.__init__(rawio)
854 bufio.__init__(rawio, buffer_size=1024)
855 bufio.__init__(rawio, buffer_size=16)
856 self.assertEquals(3, bufio.write(b"abc"))
857 bufio.flush()
858 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
859 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
860 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
861 bufio.__init__(rawio)
862 self.assertEquals(3, bufio.write(b"ghi"))
863 bufio.flush()
864 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000865
Antoine Pitrou19690592009-06-12 20:14:08 +0000866 def test_detach_flush(self):
867 raw = self.MockRawIO()
868 buf = self.tp(raw)
869 buf.write(b"howdy!")
870 self.assertFalse(raw._write_stack)
871 buf.detach()
872 self.assertEqual(raw._write_stack, [b"howdy!"])
873
874 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000875 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000876 writer = self.MockRawIO()
877 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000878 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000879 self.assertFalse(writer._write_stack)
880
Antoine Pitrou19690592009-06-12 20:14:08 +0000881 def test_write_overflow(self):
882 writer = self.MockRawIO()
883 bufio = self.tp(writer, 8)
884 contents = b"abcdefghijklmnop"
885 for n in range(0, len(contents), 3):
886 bufio.write(contents[n:n+3])
887 flushed = b"".join(writer._write_stack)
888 # At least (total - 8) bytes were implicitly flushed, perhaps more
889 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000890 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000891
Antoine Pitrou19690592009-06-12 20:14:08 +0000892 def check_writes(self, intermediate_func):
893 # Lots of writes, test the flushed output is as expected.
894 contents = bytes(range(256)) * 1000
895 n = 0
896 writer = self.MockRawIO()
897 bufio = self.tp(writer, 13)
898 # Generator of write sizes: repeat each N 15 times then proceed to N+1
899 def gen_sizes():
900 for size in count(1):
901 for i in range(15):
902 yield size
903 sizes = gen_sizes()
904 while n < len(contents):
905 size = min(next(sizes), len(contents) - n)
906 self.assertEquals(bufio.write(contents[n:n+size]), size)
907 intermediate_func(bufio)
908 n += size
909 bufio.flush()
910 self.assertEquals(contents,
911 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000912
Antoine Pitrou19690592009-06-12 20:14:08 +0000913 def test_writes(self):
914 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000915
Antoine Pitrou19690592009-06-12 20:14:08 +0000916 def test_writes_and_flushes(self):
917 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000918
Antoine Pitrou19690592009-06-12 20:14:08 +0000919 def test_writes_and_seeks(self):
920 def _seekabs(bufio):
921 pos = bufio.tell()
922 bufio.seek(pos + 1, 0)
923 bufio.seek(pos - 1, 0)
924 bufio.seek(pos, 0)
925 self.check_writes(_seekabs)
926 def _seekrel(bufio):
927 pos = bufio.seek(0, 1)
928 bufio.seek(+1, 1)
929 bufio.seek(-1, 1)
930 bufio.seek(pos, 0)
931 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000932
Antoine Pitrou19690592009-06-12 20:14:08 +0000933 def test_writes_and_truncates(self):
934 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000935
Antoine Pitrou19690592009-06-12 20:14:08 +0000936 def test_write_non_blocking(self):
937 raw = self.MockNonBlockWriterIO()
938 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000939
Antoine Pitrou19690592009-06-12 20:14:08 +0000940 self.assertEquals(bufio.write(b"abcd"), 4)
941 self.assertEquals(bufio.write(b"efghi"), 5)
942 # 1 byte will be written, the rest will be buffered
943 raw.block_on(b"k")
944 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000945
Antoine Pitrou19690592009-06-12 20:14:08 +0000946 # 8 bytes will be written, 8 will be buffered and the rest will be lost
947 raw.block_on(b"0")
948 try:
949 bufio.write(b"opqrwxyz0123456789")
950 except self.BlockingIOError as e:
951 written = e.characters_written
952 else:
953 self.fail("BlockingIOError should have been raised")
954 self.assertEquals(written, 16)
955 self.assertEquals(raw.pop_written(),
956 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000957
Antoine Pitrou19690592009-06-12 20:14:08 +0000958 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
959 s = raw.pop_written()
960 # Previously buffered bytes were flushed
961 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000962
Antoine Pitrou19690592009-06-12 20:14:08 +0000963 def test_write_and_rewind(self):
964 raw = io.BytesIO()
965 bufio = self.tp(raw, 4)
966 self.assertEqual(bufio.write(b"abcdef"), 6)
967 self.assertEqual(bufio.tell(), 6)
968 bufio.seek(0, 0)
969 self.assertEqual(bufio.write(b"XY"), 2)
970 bufio.seek(6, 0)
971 self.assertEqual(raw.getvalue(), b"XYcdef")
972 self.assertEqual(bufio.write(b"123456"), 6)
973 bufio.flush()
974 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000975
Antoine Pitrou19690592009-06-12 20:14:08 +0000976 def test_flush(self):
977 writer = self.MockRawIO()
978 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000979 bufio.write(b"abc")
980 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000981 self.assertEquals(b"abc", writer._write_stack[0])
982
Antoine Pitrou19690592009-06-12 20:14:08 +0000983 def test_destructor(self):
984 writer = self.MockRawIO()
985 bufio = self.tp(writer, 8)
986 bufio.write(b"abc")
987 del bufio
988 support.gc_collect()
989 self.assertEquals(b"abc", writer._write_stack[0])
990
991 def test_truncate(self):
992 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000993 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000994 bufio = self.tp(raw, 8)
995 bufio.write(b"abcdef")
996 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000997 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000998 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000999 self.assertEqual(f.read(), b"abc")
1000
1001 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001002 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001003 # Write out many bytes from many threads and test they were
1004 # all flushed.
1005 N = 1000
1006 contents = bytes(range(256)) * N
1007 sizes = cycle([1, 19])
1008 n = 0
1009 queue = deque()
1010 while n < len(contents):
1011 size = next(sizes)
1012 queue.append(contents[n:n+size])
1013 n += size
1014 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001015 # We use a real file object because it allows us to
1016 # exercise situations where the GIL is released before
1017 # writing the buffer to the raw streams. This is in addition
1018 # to concurrency issues due to switching threads in the middle
1019 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001020 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001021 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001022 errors = []
1023 def f():
1024 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001025 while True:
1026 try:
1027 s = queue.popleft()
1028 except IndexError:
1029 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001030 bufio.write(s)
1031 except Exception as e:
1032 errors.append(e)
1033 raise
1034 threads = [threading.Thread(target=f) for x in range(20)]
1035 for t in threads:
1036 t.start()
1037 time.sleep(0.02) # yield
1038 for t in threads:
1039 t.join()
1040 self.assertFalse(errors,
1041 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001042 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001043 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001044 s = f.read()
1045 for i in range(256):
1046 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001047 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001048 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001049
Antoine Pitrou19690592009-06-12 20:14:08 +00001050 def test_misbehaved_io(self):
1051 rawio = self.MisbehavedRawIO()
1052 bufio = self.tp(rawio, 5)
1053 self.assertRaises(IOError, bufio.seek, 0)
1054 self.assertRaises(IOError, bufio.tell)
1055 self.assertRaises(IOError, bufio.write, b"abcdef")
1056
1057 def test_max_buffer_size_deprecation(self):
1058 with support.check_warnings() as w:
1059 warnings.simplefilter("always", DeprecationWarning)
1060 self.tp(self.MockRawIO(), 8, 12)
1061 self.assertEqual(len(w.warnings), 1)
1062 warning = w.warnings[0]
1063 self.assertTrue(warning.category is DeprecationWarning)
1064 self.assertEqual(str(warning.message),
1065 "max_buffer_size is deprecated")
1066
1067
1068class CBufferedWriterTest(BufferedWriterTest):
1069 tp = io.BufferedWriter
1070
1071 def test_constructor(self):
1072 BufferedWriterTest.test_constructor(self)
1073 # The allocation can succeed on 32-bit builds, e.g. with more
1074 # than 2GB RAM and a 64-bit kernel.
1075 if sys.maxsize > 0x7FFFFFFF:
1076 rawio = self.MockRawIO()
1077 bufio = self.tp(rawio)
1078 self.assertRaises((OverflowError, MemoryError, ValueError),
1079 bufio.__init__, rawio, sys.maxsize)
1080
1081 def test_initialization(self):
1082 rawio = self.MockRawIO()
1083 bufio = self.tp(rawio)
1084 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1085 self.assertRaises(ValueError, bufio.write, b"def")
1086 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1087 self.assertRaises(ValueError, bufio.write, b"def")
1088 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1089 self.assertRaises(ValueError, bufio.write, b"def")
1090
1091 def test_garbage_collection(self):
1092 # C BufferedWriter objects are collected, and collecting them flushes
1093 # all data to disk.
1094 # The Python version has __del__, so it ends into gc.garbage instead
1095 rawio = self.FileIO(support.TESTFN, "w+b")
1096 f = self.tp(rawio)
1097 f.write(b"123xxx")
1098 f.x = f
1099 wr = weakref.ref(f)
1100 del f
1101 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001102 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001103 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001104 self.assertEqual(f.read(), b"123xxx")
1105
1106
1107class PyBufferedWriterTest(BufferedWriterTest):
1108 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001109
1110class BufferedRWPairTest(unittest.TestCase):
1111
Antoine Pitrou19690592009-06-12 20:14:08 +00001112 def test_constructor(self):
1113 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001114 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001115
Antoine Pitrou19690592009-06-12 20:14:08 +00001116 def test_detach(self):
1117 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1118 self.assertRaises(self.UnsupportedOperation, pair.detach)
1119
1120 def test_constructor_max_buffer_size_deprecation(self):
1121 with support.check_warnings() as w:
1122 warnings.simplefilter("always", DeprecationWarning)
1123 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1124 self.assertEqual(len(w.warnings), 1)
1125 warning = w.warnings[0]
1126 self.assertTrue(warning.category is DeprecationWarning)
1127 self.assertEqual(str(warning.message),
1128 "max_buffer_size is deprecated")
1129
1130 def test_constructor_with_not_readable(self):
1131 class NotReadable(MockRawIO):
1132 def readable(self):
1133 return False
1134
1135 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1136
1137 def test_constructor_with_not_writeable(self):
1138 class NotWriteable(MockRawIO):
1139 def writable(self):
1140 return False
1141
1142 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1143
1144 def test_read(self):
1145 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1146
1147 self.assertEqual(pair.read(3), b"abc")
1148 self.assertEqual(pair.read(1), b"d")
1149 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001150 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1151 self.assertEqual(pair.read(None), b"abc")
1152
1153 def test_readlines(self):
1154 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1155 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1156 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1157 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001158
1159 def test_read1(self):
1160 # .read1() is delegated to the underlying reader object, so this test
1161 # can be shallow.
1162 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1163
1164 self.assertEqual(pair.read1(3), b"abc")
1165
1166 def test_readinto(self):
1167 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1168
1169 data = bytearray(5)
1170 self.assertEqual(pair.readinto(data), 5)
1171 self.assertEqual(data, b"abcde")
1172
1173 def test_write(self):
1174 w = self.MockRawIO()
1175 pair = self.tp(self.MockRawIO(), w)
1176
1177 pair.write(b"abc")
1178 pair.flush()
1179 pair.write(b"def")
1180 pair.flush()
1181 self.assertEqual(w._write_stack, [b"abc", b"def"])
1182
1183 def test_peek(self):
1184 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1185
1186 self.assertTrue(pair.peek(3).startswith(b"abc"))
1187 self.assertEqual(pair.read(3), b"abc")
1188
1189 def test_readable(self):
1190 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1191 self.assertTrue(pair.readable())
1192
1193 def test_writeable(self):
1194 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1195 self.assertTrue(pair.writable())
1196
1197 def test_seekable(self):
1198 # BufferedRWPairs are never seekable, even if their readers and writers
1199 # are.
1200 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1201 self.assertFalse(pair.seekable())
1202
1203 # .flush() is delegated to the underlying writer object and has been
1204 # tested in the test_write method.
1205
1206 def test_close_and_closed(self):
1207 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1208 self.assertFalse(pair.closed)
1209 pair.close()
1210 self.assertTrue(pair.closed)
1211
1212 def test_isatty(self):
1213 class SelectableIsAtty(MockRawIO):
1214 def __init__(self, isatty):
1215 MockRawIO.__init__(self)
1216 self._isatty = isatty
1217
1218 def isatty(self):
1219 return self._isatty
1220
1221 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1222 self.assertFalse(pair.isatty())
1223
1224 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1225 self.assertTrue(pair.isatty())
1226
1227 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1228 self.assertTrue(pair.isatty())
1229
1230 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1231 self.assertTrue(pair.isatty())
1232
1233class CBufferedRWPairTest(BufferedRWPairTest):
1234 tp = io.BufferedRWPair
1235
1236class PyBufferedRWPairTest(BufferedRWPairTest):
1237 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001238
1239
Antoine Pitrou19690592009-06-12 20:14:08 +00001240class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1241 read_mode = "rb+"
1242 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001243
Antoine Pitrou19690592009-06-12 20:14:08 +00001244 def test_constructor(self):
1245 BufferedReaderTest.test_constructor(self)
1246 BufferedWriterTest.test_constructor(self)
1247
1248 def test_read_and_write(self):
1249 raw = self.MockRawIO((b"asdf", b"ghjk"))
1250 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001251
1252 self.assertEqual(b"as", rw.read(2))
1253 rw.write(b"ddd")
1254 rw.write(b"eee")
1255 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001256 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001257 self.assertEquals(b"dddeee", raw._write_stack[0])
1258
Antoine Pitrou19690592009-06-12 20:14:08 +00001259 def test_seek_and_tell(self):
1260 raw = self.BytesIO(b"asdfghjkl")
1261 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001262
1263 self.assertEquals(b"as", rw.read(2))
1264 self.assertEquals(2, rw.tell())
1265 rw.seek(0, 0)
1266 self.assertEquals(b"asdf", rw.read(4))
1267
1268 rw.write(b"asdf")
1269 rw.seek(0, 0)
1270 self.assertEquals(b"asdfasdfl", rw.read())
1271 self.assertEquals(9, rw.tell())
1272 rw.seek(-4, 2)
1273 self.assertEquals(5, rw.tell())
1274 rw.seek(2, 1)
1275 self.assertEquals(7, rw.tell())
1276 self.assertEquals(b"fl", rw.read(11))
1277 self.assertRaises(TypeError, rw.seek, 0.0)
1278
Antoine Pitrou19690592009-06-12 20:14:08 +00001279 def check_flush_and_read(self, read_func):
1280 raw = self.BytesIO(b"abcdefghi")
1281 bufio = self.tp(raw)
1282
1283 self.assertEquals(b"ab", read_func(bufio, 2))
1284 bufio.write(b"12")
1285 self.assertEquals(b"ef", read_func(bufio, 2))
1286 self.assertEquals(6, bufio.tell())
1287 bufio.flush()
1288 self.assertEquals(6, bufio.tell())
1289 self.assertEquals(b"ghi", read_func(bufio))
1290 raw.seek(0, 0)
1291 raw.write(b"XYZ")
1292 # flush() resets the read buffer
1293 bufio.flush()
1294 bufio.seek(0, 0)
1295 self.assertEquals(b"XYZ", read_func(bufio, 3))
1296
1297 def test_flush_and_read(self):
1298 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1299
1300 def test_flush_and_readinto(self):
1301 def _readinto(bufio, n=-1):
1302 b = bytearray(n if n >= 0 else 9999)
1303 n = bufio.readinto(b)
1304 return bytes(b[:n])
1305 self.check_flush_and_read(_readinto)
1306
1307 def test_flush_and_peek(self):
1308 def _peek(bufio, n=-1):
1309 # This relies on the fact that the buffer can contain the whole
1310 # raw stream, otherwise peek() can return less.
1311 b = bufio.peek(n)
1312 if n != -1:
1313 b = b[:n]
1314 bufio.seek(len(b), 1)
1315 return b
1316 self.check_flush_and_read(_peek)
1317
1318 def test_flush_and_write(self):
1319 raw = self.BytesIO(b"abcdefghi")
1320 bufio = self.tp(raw)
1321
1322 bufio.write(b"123")
1323 bufio.flush()
1324 bufio.write(b"45")
1325 bufio.flush()
1326 bufio.seek(0, 0)
1327 self.assertEquals(b"12345fghi", raw.getvalue())
1328 self.assertEquals(b"12345fghi", bufio.read())
1329
1330 def test_threads(self):
1331 BufferedReaderTest.test_threads(self)
1332 BufferedWriterTest.test_threads(self)
1333
1334 def test_writes_and_peek(self):
1335 def _peek(bufio):
1336 bufio.peek(1)
1337 self.check_writes(_peek)
1338 def _peek(bufio):
1339 pos = bufio.tell()
1340 bufio.seek(-1, 1)
1341 bufio.peek(1)
1342 bufio.seek(pos, 0)
1343 self.check_writes(_peek)
1344
1345 def test_writes_and_reads(self):
1346 def _read(bufio):
1347 bufio.seek(-1, 1)
1348 bufio.read(1)
1349 self.check_writes(_read)
1350
1351 def test_writes_and_read1s(self):
1352 def _read1(bufio):
1353 bufio.seek(-1, 1)
1354 bufio.read1(1)
1355 self.check_writes(_read1)
1356
1357 def test_writes_and_readintos(self):
1358 def _read(bufio):
1359 bufio.seek(-1, 1)
1360 bufio.readinto(bytearray(1))
1361 self.check_writes(_read)
1362
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001363 def test_write_after_readahead(self):
1364 # Issue #6629: writing after the buffer was filled by readahead should
1365 # first rewind the raw stream.
1366 for overwrite_size in [1, 5]:
1367 raw = self.BytesIO(b"A" * 10)
1368 bufio = self.tp(raw, 4)
1369 # Trigger readahead
1370 self.assertEqual(bufio.read(1), b"A")
1371 self.assertEqual(bufio.tell(), 1)
1372 # Overwriting should rewind the raw stream if it needs so
1373 bufio.write(b"B" * overwrite_size)
1374 self.assertEqual(bufio.tell(), overwrite_size + 1)
1375 # If the write size was smaller than the buffer size, flush() and
1376 # check that rewind happens.
1377 bufio.flush()
1378 self.assertEqual(bufio.tell(), overwrite_size + 1)
1379 s = raw.getvalue()
1380 self.assertEqual(s,
1381 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1382
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001383 def test_truncate_after_read_or_write(self):
1384 raw = self.BytesIO(b"A" * 10)
1385 bufio = self.tp(raw, 100)
1386 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1387 self.assertEqual(bufio.truncate(), 2)
1388 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1389 self.assertEqual(bufio.truncate(), 4)
1390
Antoine Pitrou19690592009-06-12 20:14:08 +00001391 def test_misbehaved_io(self):
1392 BufferedReaderTest.test_misbehaved_io(self)
1393 BufferedWriterTest.test_misbehaved_io(self)
1394
1395class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1396 tp = io.BufferedRandom
1397
1398 def test_constructor(self):
1399 BufferedRandomTest.test_constructor(self)
1400 # The allocation can succeed on 32-bit builds, e.g. with more
1401 # than 2GB RAM and a 64-bit kernel.
1402 if sys.maxsize > 0x7FFFFFFF:
1403 rawio = self.MockRawIO()
1404 bufio = self.tp(rawio)
1405 self.assertRaises((OverflowError, MemoryError, ValueError),
1406 bufio.__init__, rawio, sys.maxsize)
1407
1408 def test_garbage_collection(self):
1409 CBufferedReaderTest.test_garbage_collection(self)
1410 CBufferedWriterTest.test_garbage_collection(self)
1411
1412class PyBufferedRandomTest(BufferedRandomTest):
1413 tp = pyio.BufferedRandom
1414
1415
Christian Heimes1a6387e2008-03-26 12:49:49 +00001416# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1417# properties:
1418# - A single output character can correspond to many bytes of input.
1419# - The number of input bytes to complete the character can be
1420# undetermined until the last input byte is received.
1421# - The number of input bytes can vary depending on previous input.
1422# - A single input byte can correspond to many characters of output.
1423# - The number of output characters can be undetermined until the
1424# last input byte is received.
1425# - The number of output characters can vary depending on previous input.
1426
1427class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1428 """
1429 For testing seek/tell behavior with a stateful, buffering decoder.
1430
1431 Input is a sequence of words. Words may be fixed-length (length set
1432 by input) or variable-length (period-terminated). In variable-length
1433 mode, extra periods are ignored. Possible words are:
1434 - 'i' followed by a number sets the input length, I (maximum 99).
1435 When I is set to 0, words are space-terminated.
1436 - 'o' followed by a number sets the output length, O (maximum 99).
1437 - Any other word is converted into a word followed by a period on
1438 the output. The output word consists of the input word truncated
1439 or padded out with hyphens to make its length equal to O. If O
1440 is 0, the word is output verbatim without truncating or padding.
1441 I and O are initially set to 1. When I changes, any buffered input is
1442 re-scanned according to the new I. EOF also terminates the last word.
1443 """
1444
1445 def __init__(self, errors='strict'):
1446 codecs.IncrementalDecoder.__init__(self, errors)
1447 self.reset()
1448
1449 def __repr__(self):
1450 return '<SID %x>' % id(self)
1451
1452 def reset(self):
1453 self.i = 1
1454 self.o = 1
1455 self.buffer = bytearray()
1456
1457 def getstate(self):
1458 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1459 return bytes(self.buffer), i*100 + o
1460
1461 def setstate(self, state):
1462 buffer, io = state
1463 self.buffer = bytearray(buffer)
1464 i, o = divmod(io, 100)
1465 self.i, self.o = i ^ 1, o ^ 1
1466
1467 def decode(self, input, final=False):
1468 output = ''
1469 for b in input:
1470 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001471 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001472 if self.buffer:
1473 output += self.process_word()
1474 else:
1475 self.buffer.append(b)
1476 else: # fixed-length, terminate after self.i bytes
1477 self.buffer.append(b)
1478 if len(self.buffer) == self.i:
1479 output += self.process_word()
1480 if final and self.buffer: # EOF terminates the last word
1481 output += self.process_word()
1482 return output
1483
1484 def process_word(self):
1485 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001486 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001487 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001488 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001489 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1490 else:
1491 output = self.buffer.decode('ascii')
1492 if len(output) < self.o:
1493 output += '-'*self.o # pad out with hyphens
1494 if self.o:
1495 output = output[:self.o] # truncate to output length
1496 output += '.'
1497 self.buffer = bytearray()
1498 return output
1499
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001500 codecEnabled = False
1501
1502 @classmethod
1503 def lookupTestDecoder(cls, name):
1504 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001505 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001506 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001507 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001508 incrementalencoder=None,
1509 streamreader=None, streamwriter=None,
1510 incrementaldecoder=cls)
1511
1512# Register the previous decoder for testing.
1513# Disabled by default, tests will enable it.
1514codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1515
1516
Christian Heimes1a6387e2008-03-26 12:49:49 +00001517class StatefulIncrementalDecoderTest(unittest.TestCase):
1518 """
1519 Make sure the StatefulIncrementalDecoder actually works.
1520 """
1521
1522 test_cases = [
1523 # I=1, O=1 (fixed-length input == fixed-length output)
1524 (b'abcd', False, 'a.b.c.d.'),
1525 # I=0, O=0 (variable-length input, variable-length output)
1526 (b'oiabcd', True, 'abcd.'),
1527 # I=0, O=0 (should ignore extra periods)
1528 (b'oi...abcd...', True, 'abcd.'),
1529 # I=0, O=6 (variable-length input, fixed-length output)
1530 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1531 # I=2, O=6 (fixed-length input < fixed-length output)
1532 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1533 # I=6, O=3 (fixed-length input > fixed-length output)
1534 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1535 # I=0, then 3; O=29, then 15 (with longer output)
1536 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1537 'a----------------------------.' +
1538 'b----------------------------.' +
1539 'cde--------------------------.' +
1540 'abcdefghijabcde.' +
1541 'a.b------------.' +
1542 '.c.------------.' +
1543 'd.e------------.' +
1544 'k--------------.' +
1545 'l--------------.' +
1546 'm--------------.')
1547 ]
1548
Antoine Pitrou19690592009-06-12 20:14:08 +00001549 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001550 # Try a few one-shot test cases.
1551 for input, eof, output in self.test_cases:
1552 d = StatefulIncrementalDecoder()
1553 self.assertEquals(d.decode(input, eof), output)
1554
1555 # Also test an unfinished decode, followed by forcing EOF.
1556 d = StatefulIncrementalDecoder()
1557 self.assertEquals(d.decode(b'oiabcd'), '')
1558 self.assertEquals(d.decode(b'', 1), 'abcd.')
1559
1560class TextIOWrapperTest(unittest.TestCase):
1561
1562 def setUp(self):
1563 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1564 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001565 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001566
1567 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001568 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001569
Antoine Pitrou19690592009-06-12 20:14:08 +00001570 def test_constructor(self):
1571 r = self.BytesIO(b"\xc3\xa9\n\n")
1572 b = self.BufferedReader(r, 1000)
1573 t = self.TextIOWrapper(b)
1574 t.__init__(b, encoding="latin1", newline="\r\n")
1575 self.assertEquals(t.encoding, "latin1")
1576 self.assertEquals(t.line_buffering, False)
1577 t.__init__(b, encoding="utf8", line_buffering=True)
1578 self.assertEquals(t.encoding, "utf8")
1579 self.assertEquals(t.line_buffering, True)
1580 self.assertEquals("\xe9\n", t.readline())
1581 self.assertRaises(TypeError, t.__init__, b, newline=42)
1582 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1583
1584 def test_detach(self):
1585 r = self.BytesIO()
1586 b = self.BufferedWriter(r)
1587 t = self.TextIOWrapper(b)
1588 self.assertIs(t.detach(), b)
1589
1590 t = self.TextIOWrapper(b, encoding="ascii")
1591 t.write("howdy")
1592 self.assertFalse(r.getvalue())
1593 t.detach()
1594 self.assertEqual(r.getvalue(), b"howdy")
1595 self.assertRaises(ValueError, t.detach)
1596
1597 def test_repr(self):
1598 raw = self.BytesIO("hello".encode("utf-8"))
1599 b = self.BufferedReader(raw)
1600 t = self.TextIOWrapper(b, encoding="utf-8")
1601 modname = self.TextIOWrapper.__module__
1602 self.assertEqual(repr(t),
1603 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1604 raw.name = "dummy"
1605 self.assertEqual(repr(t),
1606 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1607 raw.name = b"dummy"
1608 self.assertEqual(repr(t),
1609 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1610
1611 def test_line_buffering(self):
1612 r = self.BytesIO()
1613 b = self.BufferedWriter(r, 1000)
1614 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1615 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001616 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001617 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001618 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001619 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001620 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1621
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 def test_encoding(self):
1623 # Check the encoding attribute is always set, and valid
1624 b = self.BytesIO()
1625 t = self.TextIOWrapper(b, encoding="utf8")
1626 self.assertEqual(t.encoding, "utf8")
1627 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001628 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001629 codecs.lookup(t.encoding)
1630
1631 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001632 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001633 b = self.BytesIO(b"abc\n\xff\n")
1634 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001635 self.assertRaises(UnicodeError, t.read)
1636 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001637 b = self.BytesIO(b"abc\n\xff\n")
1638 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001639 self.assertRaises(UnicodeError, t.read)
1640 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001641 b = self.BytesIO(b"abc\n\xff\n")
1642 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001643 self.assertEquals(t.read(), "abc\n\n")
1644 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001645 b = self.BytesIO(b"abc\n\xff\n")
1646 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1647 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001648
Antoine Pitrou19690592009-06-12 20:14:08 +00001649 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001650 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001651 b = self.BytesIO()
1652 t = self.TextIOWrapper(b, encoding="ascii")
1653 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001654 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001655 b = self.BytesIO()
1656 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1657 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001658 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001659 b = self.BytesIO()
1660 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001661 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001662 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001663 t.flush()
1664 self.assertEquals(b.getvalue(), b"abcdef\n")
1665 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001666 b = self.BytesIO()
1667 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001668 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001669 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001670 t.flush()
1671 self.assertEquals(b.getvalue(), b"abc?def\n")
1672
Antoine Pitrou19690592009-06-12 20:14:08 +00001673 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001674 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1675
1676 tests = [
1677 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1678 [ '', input_lines ],
1679 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1680 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1681 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1682 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001683 encodings = (
1684 'utf-8', 'latin-1',
1685 'utf-16', 'utf-16-le', 'utf-16-be',
1686 'utf-32', 'utf-32-le', 'utf-32-be',
1687 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001688
1689 # Try a range of buffer sizes to test the case where \r is the last
1690 # character in TextIOWrapper._pending_line.
1691 for encoding in encodings:
1692 # XXX: str.encode() should return bytes
1693 data = bytes(''.join(input_lines).encode(encoding))
1694 for do_reads in (False, True):
1695 for bufsize in range(1, 10):
1696 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001697 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1698 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001699 encoding=encoding)
1700 if do_reads:
1701 got_lines = []
1702 while True:
1703 c2 = textio.read(2)
1704 if c2 == '':
1705 break
1706 self.assertEquals(len(c2), 2)
1707 got_lines.append(c2 + textio.readline())
1708 else:
1709 got_lines = list(textio)
1710
1711 for got_line, exp_line in zip(got_lines, exp_lines):
1712 self.assertEquals(got_line, exp_line)
1713 self.assertEquals(len(got_lines), len(exp_lines))
1714
Antoine Pitrou19690592009-06-12 20:14:08 +00001715 def test_newlines_input(self):
1716 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001717 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1718 for newline, expected in [
1719 (None, normalized.decode("ascii").splitlines(True)),
1720 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001721 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1722 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1723 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001724 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001725 buf = self.BytesIO(testdata)
1726 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001727 self.assertEquals(txt.readlines(), expected)
1728 txt.seek(0)
1729 self.assertEquals(txt.read(), "".join(expected))
1730
Antoine Pitrou19690592009-06-12 20:14:08 +00001731 def test_newlines_output(self):
1732 testdict = {
1733 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1734 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1735 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1736 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1737 }
1738 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1739 for newline, expected in tests:
1740 buf = self.BytesIO()
1741 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1742 txt.write("AAA\nB")
1743 txt.write("BB\nCCC\n")
1744 txt.write("X\rY\r\nZ")
1745 txt.flush()
1746 self.assertEquals(buf.closed, False)
1747 self.assertEquals(buf.getvalue(), expected)
1748
1749 def test_destructor(self):
1750 l = []
1751 base = self.BytesIO
1752 class MyBytesIO(base):
1753 def close(self):
1754 l.append(self.getvalue())
1755 base.close(self)
1756 b = MyBytesIO()
1757 t = self.TextIOWrapper(b, encoding="ascii")
1758 t.write("abc")
1759 del t
1760 support.gc_collect()
1761 self.assertEquals([b"abc"], l)
1762
1763 def test_override_destructor(self):
1764 record = []
1765 class MyTextIO(self.TextIOWrapper):
1766 def __del__(self):
1767 record.append(1)
1768 try:
1769 f = super(MyTextIO, self).__del__
1770 except AttributeError:
1771 pass
1772 else:
1773 f()
1774 def close(self):
1775 record.append(2)
1776 super(MyTextIO, self).close()
1777 def flush(self):
1778 record.append(3)
1779 super(MyTextIO, self).flush()
1780 b = self.BytesIO()
1781 t = MyTextIO(b, encoding="ascii")
1782 del t
1783 support.gc_collect()
1784 self.assertEqual(record, [1, 2, 3])
1785
1786 def test_error_through_destructor(self):
1787 # Test that the exception state is not modified by a destructor,
1788 # even if close() fails.
1789 rawio = self.CloseFailureIO()
1790 def f():
1791 self.TextIOWrapper(rawio).xyzzy
1792 with support.captured_output("stderr") as s:
1793 self.assertRaises(AttributeError, f)
1794 s = s.getvalue().strip()
1795 if s:
1796 # The destructor *may* have printed an unraisable error, check it
1797 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001798 self.assertTrue(s.startswith("Exception IOError: "), s)
1799 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800
1801 # Systematic tests of the text I/O API
1802
Antoine Pitrou19690592009-06-12 20:14:08 +00001803 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001804 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1805 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001806 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001807 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001808 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001809 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001810 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001811 f._CHUNK_SIZE = chunksize
1812 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001813 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001814 cookie = f.tell()
1815 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001816 self.assertEquals(f.read(None), "abc")
1817 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001818 self.assertEquals(f.read(2), "ab")
1819 self.assertEquals(f.read(1), "c")
1820 self.assertEquals(f.read(1), "")
1821 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001822 self.assertEquals(f.tell(), cookie)
1823 self.assertEquals(f.seek(0), 0)
1824 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001825 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001826 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001827 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001828 if enc.startswith("utf"):
1829 self.multi_line_test(f, enc)
1830 f.close()
1831
1832 def multi_line_test(self, f, enc):
1833 f.seek(0)
1834 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001835 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001836 wlines = []
1837 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1838 chars = []
1839 for i in range(size):
1840 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001841 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001842 wlines.append((f.tell(), line))
1843 f.write(line)
1844 f.seek(0)
1845 rlines = []
1846 while True:
1847 pos = f.tell()
1848 line = f.readline()
1849 if not line:
1850 break
1851 rlines.append((pos, line))
1852 self.assertEquals(rlines, wlines)
1853
Antoine Pitrou19690592009-06-12 20:14:08 +00001854 def test_telling(self):
1855 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001856 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001857 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001858 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001859 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001860 p2 = f.tell()
1861 f.seek(0)
1862 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001863 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001864 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001865 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001866 self.assertEquals(f.tell(), p2)
1867 f.seek(0)
1868 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001869 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001870 self.assertRaises(IOError, f.tell)
1871 self.assertEquals(f.tell(), p2)
1872 f.close()
1873
Antoine Pitrou19690592009-06-12 20:14:08 +00001874 def test_seeking(self):
1875 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001876 prefix_size = chunk_size - 2
1877 u_prefix = "a" * prefix_size
1878 prefix = bytes(u_prefix.encode("utf-8"))
1879 self.assertEquals(len(u_prefix), len(prefix))
1880 u_suffix = "\u8888\n"
1881 suffix = bytes(u_suffix.encode("utf-8"))
1882 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001883 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001884 f.write(line*2)
1885 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001886 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 self.assertEquals(f.tell(), prefix_size)
1890 self.assertEquals(f.readline(), u_suffix)
1891
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893 # Regression test for a specific bug
1894 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001895 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001896 f.write(data)
1897 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001898 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 f._CHUNK_SIZE # Just test that it exists
1900 f._CHUNK_SIZE = 2
1901 f.readline()
1902 f.tell()
1903
Antoine Pitrou19690592009-06-12 20:14:08 +00001904 def test_seek_and_tell(self):
1905 #Test seek/tell using the StatefulIncrementalDecoder.
1906 # Make test faster by doing smaller seeks
1907 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001908
Antoine Pitrou19690592009-06-12 20:14:08 +00001909 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001910 """Tell/seek to various points within a data stream and ensure
1911 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001912 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001913 f.write(data)
1914 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001915 f = self.open(support.TESTFN, encoding='test_decoder')
1916 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001917 decoded = f.read()
1918 f.close()
1919
1920 for i in range(min_pos, len(decoded) + 1): # seek positions
1921 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001922 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001923 self.assertEquals(f.read(i), decoded[:i])
1924 cookie = f.tell()
1925 self.assertEquals(f.read(j), decoded[i:i + j])
1926 f.seek(cookie)
1927 self.assertEquals(f.read(), decoded[i:])
1928 f.close()
1929
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001930 # Enable the test decoder.
1931 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001932
1933 # Run the tests.
1934 try:
1935 # Try each test case.
1936 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001937 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001938
1939 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001940 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1941 offset = CHUNK_SIZE - len(input)//2
1942 prefix = b'.'*offset
1943 # Don't bother seeking into the prefix (takes too long).
1944 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001945 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001946
1947 # Ensure our test decoder won't interfere with subsequent tests.
1948 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001949 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001950
Antoine Pitrou19690592009-06-12 20:14:08 +00001951 def test_encoded_writes(self):
1952 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001953 tests = ("utf-16",
1954 "utf-16-le",
1955 "utf-16-be",
1956 "utf-32",
1957 "utf-32-le",
1958 "utf-32-be")
1959 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001960 buf = self.BytesIO()
1961 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 # Check if the BOM is written only once (see issue1753).
1963 f.write(data)
1964 f.write(data)
1965 f.seek(0)
1966 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001967 f.seek(0)
1968 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1970
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 def test_unreadable(self):
1972 class UnReadable(self.BytesIO):
1973 def readable(self):
1974 return False
1975 txt = self.TextIOWrapper(UnReadable())
1976 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001977
Antoine Pitrou19690592009-06-12 20:14:08 +00001978 def test_read_one_by_one(self):
1979 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001980 reads = ""
1981 while True:
1982 c = txt.read(1)
1983 if not c:
1984 break
1985 reads += c
1986 self.assertEquals(reads, "AA\nBB")
1987
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001988 def test_readlines(self):
1989 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1990 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1991 txt.seek(0)
1992 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1993 txt.seek(0)
1994 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1995
Christian Heimes1a6387e2008-03-26 12:49:49 +00001996 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001997 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001998 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001999 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000 reads = ""
2001 while True:
2002 c = txt.read(128)
2003 if not c:
2004 break
2005 reads += c
2006 self.assertEquals(reads, "A"*127+"\nB")
2007
2008 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002009 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002010
2011 # read one char at a time
2012 reads = ""
2013 while True:
2014 c = txt.read(1)
2015 if not c:
2016 break
2017 reads += c
2018 self.assertEquals(reads, self.normalized)
2019
2020 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002021 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002022 txt._CHUNK_SIZE = 4
2023
2024 reads = ""
2025 while True:
2026 c = txt.read(4)
2027 if not c:
2028 break
2029 reads += c
2030 self.assertEquals(reads, self.normalized)
2031
2032 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002033 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002034 txt._CHUNK_SIZE = 4
2035
2036 reads = txt.read(4)
2037 reads += txt.read(4)
2038 reads += txt.readline()
2039 reads += txt.readline()
2040 reads += txt.readline()
2041 self.assertEquals(reads, self.normalized)
2042
2043 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002044 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 txt._CHUNK_SIZE = 4
2046
2047 reads = txt.read(4)
2048 reads += txt.read()
2049 self.assertEquals(reads, self.normalized)
2050
2051 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002052 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002053 txt._CHUNK_SIZE = 4
2054
2055 reads = txt.read(4)
2056 pos = txt.tell()
2057 txt.seek(0)
2058 txt.seek(pos)
2059 self.assertEquals(txt.read(4), "BBB\n")
2060
2061 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002062 buffer = self.BytesIO(self.testdata)
2063 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002064
2065 self.assertEqual(buffer.seekable(), txt.seekable())
2066
Antoine Pitrou19690592009-06-12 20:14:08 +00002067 @unittest.skip("Issue #6213 with incremental encoders")
2068 def test_append_bom(self):
2069 # The BOM is not written again when appending to a non-empty file
2070 filename = support.TESTFN
2071 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2072 with self.open(filename, 'w', encoding=charset) as f:
2073 f.write('aaa')
2074 pos = f.tell()
2075 with self.open(filename, 'rb') as f:
2076 self.assertEquals(f.read(), 'aaa'.encode(charset))
2077
2078 with self.open(filename, 'a', encoding=charset) as f:
2079 f.write('xxx')
2080 with self.open(filename, 'rb') as f:
2081 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2082
2083 @unittest.skip("Issue #6213 with incremental encoders")
2084 def test_seek_bom(self):
2085 # Same test, but when seeking manually
2086 filename = support.TESTFN
2087 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2088 with self.open(filename, 'w', encoding=charset) as f:
2089 f.write('aaa')
2090 pos = f.tell()
2091 with self.open(filename, 'r+', encoding=charset) as f:
2092 f.seek(pos)
2093 f.write('zzz')
2094 f.seek(0)
2095 f.write('bbb')
2096 with self.open(filename, 'rb') as f:
2097 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2098
2099 def test_errors_property(self):
2100 with self.open(support.TESTFN, "w") as f:
2101 self.assertEqual(f.errors, "strict")
2102 with self.open(support.TESTFN, "w", errors="replace") as f:
2103 self.assertEqual(f.errors, "replace")
2104
2105
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002106 def test_threads_write(self):
2107 # Issue6750: concurrent writes could duplicate data
2108 event = threading.Event()
2109 with self.open(support.TESTFN, "w", buffering=1) as f:
2110 def run(n):
2111 text = "Thread%03d\n" % n
2112 event.wait()
2113 f.write(text)
2114 threads = [threading.Thread(target=lambda n=x: run(n))
2115 for x in range(20)]
2116 for t in threads:
2117 t.start()
2118 time.sleep(0.02)
2119 event.set()
2120 for t in threads:
2121 t.join()
2122 with self.open(support.TESTFN) as f:
2123 content = f.read()
2124 for n in range(20):
2125 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2126
Antoine Pitrou19690592009-06-12 20:14:08 +00002127class CTextIOWrapperTest(TextIOWrapperTest):
2128
2129 def test_initialization(self):
2130 r = self.BytesIO(b"\xc3\xa9\n\n")
2131 b = self.BufferedReader(r, 1000)
2132 t = self.TextIOWrapper(b)
2133 self.assertRaises(TypeError, t.__init__, b, newline=42)
2134 self.assertRaises(ValueError, t.read)
2135 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2136 self.assertRaises(ValueError, t.read)
2137
2138 def test_garbage_collection(self):
2139 # C TextIOWrapper objects are collected, and collecting them flushes
2140 # all data to disk.
2141 # The Python version has __del__, so it ends in gc.garbage instead.
2142 rawio = io.FileIO(support.TESTFN, "wb")
2143 b = self.BufferedWriter(rawio)
2144 t = self.TextIOWrapper(b, encoding="ascii")
2145 t.write("456def")
2146 t.x = t
2147 wr = weakref.ref(t)
2148 del t
2149 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002150 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002151 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002152 self.assertEqual(f.read(), b"456def")
2153
2154class PyTextIOWrapperTest(TextIOWrapperTest):
2155 pass
2156
2157
2158class IncrementalNewlineDecoderTest(unittest.TestCase):
2159
2160 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002161 # UTF-8 specific tests for a newline decoder
2162 def _check_decode(b, s, **kwargs):
2163 # We exercise getstate() / setstate() as well as decode()
2164 state = decoder.getstate()
2165 self.assertEquals(decoder.decode(b, **kwargs), s)
2166 decoder.setstate(state)
2167 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002168
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002169 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002170
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002171 _check_decode(b'\xe8', "")
2172 _check_decode(b'\xa2', "")
2173 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002174
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002175 _check_decode(b'\xe8', "")
2176 _check_decode(b'\xa2', "")
2177 _check_decode(b'\x88', "\u8888")
2178
2179 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002180 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2181
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002182 decoder.reset()
2183 _check_decode(b'\n', "\n")
2184 _check_decode(b'\r', "")
2185 _check_decode(b'', "\n", final=True)
2186 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002187
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002188 _check_decode(b'\r', "")
2189 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002190
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002191 _check_decode(b'\r\r\n', "\n\n")
2192 _check_decode(b'\r', "")
2193 _check_decode(b'\r', "\n")
2194 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002195
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002196 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2197 _check_decode(b'\xe8\xa2\x88', "\u8888")
2198 _check_decode(b'\n', "\n")
2199 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2200 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002201
Antoine Pitrou19690592009-06-12 20:14:08 +00002202 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002203 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002204 if encoding is not None:
2205 encoder = codecs.getincrementalencoder(encoding)()
2206 def _decode_bytewise(s):
2207 # Decode one byte at a time
2208 for b in encoder.encode(s):
2209 result.append(decoder.decode(b))
2210 else:
2211 encoder = None
2212 def _decode_bytewise(s):
2213 # Decode one char at a time
2214 for c in s:
2215 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002216 self.assertEquals(decoder.newlines, None)
2217 _decode_bytewise("abc\n\r")
2218 self.assertEquals(decoder.newlines, '\n')
2219 _decode_bytewise("\nabc")
2220 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2221 _decode_bytewise("abc\r")
2222 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2223 _decode_bytewise("abc")
2224 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2225 _decode_bytewise("abc\r")
2226 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2227 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002228 input = "abc"
2229 if encoder is not None:
2230 encoder.reset()
2231 input = encoder.encode(input)
2232 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002233 self.assertEquals(decoder.newlines, None)
2234
2235 def test_newline_decoder(self):
2236 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 # None meaning the IncrementalNewlineDecoder takes unicode input
2238 # rather than bytes input
2239 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002240 'utf-16', 'utf-16-le', 'utf-16-be',
2241 'utf-32', 'utf-32-le', 'utf-32-be',
2242 )
2243 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002244 decoder = enc and codecs.getincrementaldecoder(enc)()
2245 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2246 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002247 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002248 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2249 self.check_newline_decoding_utf8(decoder)
2250
2251 def test_newline_bytes(self):
2252 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2253 def _check(dec):
2254 self.assertEquals(dec.newlines, None)
2255 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2256 self.assertEquals(dec.newlines, None)
2257 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2258 self.assertEquals(dec.newlines, None)
2259 dec = self.IncrementalNewlineDecoder(None, translate=False)
2260 _check(dec)
2261 dec = self.IncrementalNewlineDecoder(None, translate=True)
2262 _check(dec)
2263
2264class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2265 pass
2266
2267class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2268 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002269
Christian Heimes1a6387e2008-03-26 12:49:49 +00002270
2271# XXX Tests for open()
2272
2273class MiscIOTest(unittest.TestCase):
2274
Benjamin Petersonad100c32008-11-20 22:06:22 +00002275 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002276 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002277
Antoine Pitrou19690592009-06-12 20:14:08 +00002278 def test___all__(self):
2279 for name in self.io.__all__:
2280 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002281 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002282 if name == "open":
2283 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002284 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002285 self.assertTrue(issubclass(obj, Exception), name)
2286 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002287 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002288
Benjamin Petersonad100c32008-11-20 22:06:22 +00002289 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002290 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002291 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002292 f.close()
2293
Antoine Pitrou19690592009-06-12 20:14:08 +00002294 f = self.open(support.TESTFN, "U")
2295 self.assertEquals(f.name, support.TESTFN)
2296 self.assertEquals(f.buffer.name, support.TESTFN)
2297 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002298 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002299 self.assertEquals(f.buffer.mode, "rb")
2300 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002301 f.close()
2302
Antoine Pitrou19690592009-06-12 20:14:08 +00002303 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002304 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002305 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2306 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002307
Antoine Pitrou19690592009-06-12 20:14:08 +00002308 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002309 self.assertEquals(g.mode, "wb")
2310 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002311 self.assertEquals(g.name, f.fileno())
2312 self.assertEquals(g.raw.name, f.fileno())
2313 f.close()
2314 g.close()
2315
Antoine Pitrou19690592009-06-12 20:14:08 +00002316 def test_io_after_close(self):
2317 for kwargs in [
2318 {"mode": "w"},
2319 {"mode": "wb"},
2320 {"mode": "w", "buffering": 1},
2321 {"mode": "w", "buffering": 2},
2322 {"mode": "wb", "buffering": 0},
2323 {"mode": "r"},
2324 {"mode": "rb"},
2325 {"mode": "r", "buffering": 1},
2326 {"mode": "r", "buffering": 2},
2327 {"mode": "rb", "buffering": 0},
2328 {"mode": "w+"},
2329 {"mode": "w+b"},
2330 {"mode": "w+", "buffering": 1},
2331 {"mode": "w+", "buffering": 2},
2332 {"mode": "w+b", "buffering": 0},
2333 ]:
2334 f = self.open(support.TESTFN, **kwargs)
2335 f.close()
2336 self.assertRaises(ValueError, f.flush)
2337 self.assertRaises(ValueError, f.fileno)
2338 self.assertRaises(ValueError, f.isatty)
2339 self.assertRaises(ValueError, f.__iter__)
2340 if hasattr(f, "peek"):
2341 self.assertRaises(ValueError, f.peek, 1)
2342 self.assertRaises(ValueError, f.read)
2343 if hasattr(f, "read1"):
2344 self.assertRaises(ValueError, f.read1, 1024)
2345 if hasattr(f, "readinto"):
2346 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2347 self.assertRaises(ValueError, f.readline)
2348 self.assertRaises(ValueError, f.readlines)
2349 self.assertRaises(ValueError, f.seek, 0)
2350 self.assertRaises(ValueError, f.tell)
2351 self.assertRaises(ValueError, f.truncate)
2352 self.assertRaises(ValueError, f.write,
2353 b"" if "b" in kwargs['mode'] else "")
2354 self.assertRaises(ValueError, f.writelines, [])
2355 self.assertRaises(ValueError, next, f)
2356
2357 def test_blockingioerror(self):
2358 # Various BlockingIOError issues
2359 self.assertRaises(TypeError, self.BlockingIOError)
2360 self.assertRaises(TypeError, self.BlockingIOError, 1)
2361 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2362 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2363 b = self.BlockingIOError(1, "")
2364 self.assertEqual(b.characters_written, 0)
2365 class C(unicode):
2366 pass
2367 c = C("")
2368 b = self.BlockingIOError(1, c)
2369 c.b = b
2370 b.c = c
2371 wr = weakref.ref(c)
2372 del c, b
2373 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002374 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002375
2376 def test_abcs(self):
2377 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002378 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2379 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2380 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2381 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002382
2383 def _check_abc_inheritance(self, abcmodule):
2384 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002385 self.assertIsInstance(f, abcmodule.IOBase)
2386 self.assertIsInstance(f, abcmodule.RawIOBase)
2387 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2388 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002389 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002390 self.assertIsInstance(f, abcmodule.IOBase)
2391 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2392 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2393 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002394 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002395 self.assertIsInstance(f, abcmodule.IOBase)
2396 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2397 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2398 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002399
2400 def test_abc_inheritance(self):
2401 # Test implementations inherit from their respective ABCs
2402 self._check_abc_inheritance(self)
2403
2404 def test_abc_inheritance_official(self):
2405 # Test implementations inherit from the official ABCs of the
2406 # baseline "io" module.
2407 self._check_abc_inheritance(io)
2408
2409class CMiscIOTest(MiscIOTest):
2410 io = io
2411
2412class PyMiscIOTest(MiscIOTest):
2413 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002414
Christian Heimes1a6387e2008-03-26 12:49:49 +00002415def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002416 tests = (CIOTest, PyIOTest,
2417 CBufferedReaderTest, PyBufferedReaderTest,
2418 CBufferedWriterTest, PyBufferedWriterTest,
2419 CBufferedRWPairTest, PyBufferedRWPairTest,
2420 CBufferedRandomTest, PyBufferedRandomTest,
2421 StatefulIncrementalDecoderTest,
2422 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2423 CTextIOWrapperTest, PyTextIOWrapperTest,
2424 CMiscIOTest, PyMiscIOTest,
2425 )
2426
2427 # Put the namespaces of the IO module we are testing and some useful mock
2428 # classes in the __dict__ of each test.
2429 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2430 MockNonBlockWriterIO)
2431 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2432 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2433 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2434 globs = globals()
2435 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2436 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2437 # Avoid turning open into a bound method.
2438 py_io_ns["open"] = pyio.OpenWrapper
2439 for test in tests:
2440 if test.__name__.startswith("C"):
2441 for name, obj in c_io_ns.items():
2442 setattr(test, name, obj)
2443 elif test.__name__.startswith("Py"):
2444 for name, obj in py_io_ns.items():
2445 setattr(test, name, obj)
2446
2447 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002448
2449if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002450 test_main()