blob: b3feb1b065c90843c7ebb9ab8ce9188625844ad3 [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Georg Brandla4f46e12010-02-07 17:03:15 +000033from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000034from collections import deque
35from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000036
37import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000040try:
41 import threading
42except ImportError:
43 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000044
45__metaclass__ = type
46bytes = support.py3k_bytes
47
48def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
50 with io.open(__file__, "r", encoding="latin1") as f:
51 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000052
53
Antoine Pitrou19690592009-06-12 20:14:08 +000054class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000055
56 def __init__(self, read_stack=()):
57 self._read_stack = list(read_stack)
58 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000059 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000060
61 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000062 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000063 try:
64 return self._read_stack.pop(0)
65 except:
66 return b""
67
68 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000069 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000070 return len(b)
71
72 def writable(self):
73 return True
74
75 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
81 def seekable(self):
82 return True
83
84 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000085 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000086
87 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
96 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
113class CMockRawIO(MockRawIO, io.RawIOBase):
114 pass
115
116class PyMockRawIO(MockRawIO, pyio.RawIOBase):
117 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000118
119
Antoine Pitrou19690592009-06-12 20:14:08 +0000120class MisbehavedRawIO(MockRawIO):
121 def write(self, b):
122 return MockRawIO.write(self, b) * 2
123
124 def read(self, n=None):
125 return MockRawIO.read(self, n) * 2
126
127 def seek(self, pos, whence):
128 return -123
129
130 def tell(self):
131 return -456
132
133 def readinto(self, buf):
134 MockRawIO.readinto(self, buf)
135 return len(buf) * 5
136
137class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
138 pass
139
140class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
141 pass
142
143
144class CloseFailureIO(MockRawIO):
145 closed = 0
146
147 def close(self):
148 if not self.closed:
149 self.closed = 1
150 raise IOError
151
152class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
153 pass
154
155class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
156 pass
157
158
159class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000160
161 def __init__(self, data):
162 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000163 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000164
165 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000166 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000167 self.read_history.append(None if res is None else len(res))
168 return res
169
Antoine Pitrou19690592009-06-12 20:14:08 +0000170 def readinto(self, b):
171 res = super(MockFileIO, self).readinto(b)
172 self.read_history.append(res)
173 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000174
Antoine Pitrou19690592009-06-12 20:14:08 +0000175class CMockFileIO(MockFileIO, io.BytesIO):
176 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000177
Antoine Pitrou19690592009-06-12 20:14:08 +0000178class PyMockFileIO(MockFileIO, pyio.BytesIO):
179 pass
180
181
182class MockNonBlockWriterIO:
183
184 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000185 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
Antoine Pitrou19690592009-06-12 20:14:08 +0000188 def pop_written(self):
189 s = b"".join(self._write_stack)
190 self._write_stack[:] = []
191 return s
192
193 def block_on(self, char):
194 """Block when a given char is encountered."""
195 self._blocker_char = char
196
197 def readable(self):
198 return True
199
200 def seekable(self):
201 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000202
203 def writable(self):
204 return True
205
Antoine Pitrou19690592009-06-12 20:14:08 +0000206 def write(self, b):
207 b = bytes(b)
208 n = -1
209 if self._blocker_char:
210 try:
211 n = b.index(self._blocker_char)
212 except ValueError:
213 pass
214 else:
215 self._blocker_char = None
216 self._write_stack.append(b[:n])
217 raise self.BlockingIOError(0, "test blocking", n)
218 self._write_stack.append(b)
219 return len(b)
220
221class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
222 BlockingIOError = io.BlockingIOError
223
224class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
225 BlockingIOError = pyio.BlockingIOError
226
Christian Heimes1a6387e2008-03-26 12:49:49 +0000227
228class IOTest(unittest.TestCase):
229
Antoine Pitrou19690592009-06-12 20:14:08 +0000230 def setUp(self):
231 support.unlink(support.TESTFN)
232
Christian Heimes1a6387e2008-03-26 12:49:49 +0000233 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000234 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000235
236 def write_ops(self, f):
237 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000238 f.truncate(0)
239 self.assertEqual(f.tell(), 5)
240 f.seek(0)
241
242 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"Hello."), 6)
245 self.assertEqual(f.tell(), 6)
246 self.assertEqual(f.seek(-1, 1), 5)
247 self.assertEqual(f.tell(), 5)
248 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
249 self.assertEqual(f.seek(0), 0)
250 self.assertEqual(f.write(b"h"), 1)
251 self.assertEqual(f.seek(-1, 2), 13)
252 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000253
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000255 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000256 self.assertRaises(TypeError, f.seek, 0.0)
257
258 def read_ops(self, f, buffered=False):
259 data = f.read(5)
260 self.assertEqual(data, b"hello")
261 data = bytearray(data)
262 self.assertEqual(f.readinto(data), 5)
263 self.assertEqual(data, b" worl")
264 self.assertEqual(f.readinto(data), 2)
265 self.assertEqual(len(data), 5)
266 self.assertEqual(data[:2], b"d\n")
267 self.assertEqual(f.seek(0), 0)
268 self.assertEqual(f.read(20), b"hello world\n")
269 self.assertEqual(f.read(1), b"")
270 self.assertEqual(f.readinto(bytearray(b"x")), 0)
271 self.assertEqual(f.seek(-6, 2), 6)
272 self.assertEqual(f.read(5), b"world")
273 self.assertEqual(f.read(0), b"")
274 self.assertEqual(f.readinto(bytearray()), 0)
275 self.assertEqual(f.seek(-6, 1), 5)
276 self.assertEqual(f.read(5), b" worl")
277 self.assertEqual(f.tell(), 10)
278 self.assertRaises(TypeError, f.seek, 0.0)
279 if buffered:
280 f.seek(0)
281 self.assertEqual(f.read(), b"hello world\n")
282 f.seek(6)
283 self.assertEqual(f.read(), b"world\n")
284 self.assertEqual(f.read(), b"")
285
286 LARGE = 2**31
287
288 def large_file_ops(self, f):
289 assert f.readable()
290 assert f.writable()
291 self.assertEqual(f.seek(self.LARGE), self.LARGE)
292 self.assertEqual(f.tell(), self.LARGE)
293 self.assertEqual(f.write(b"xxx"), 3)
294 self.assertEqual(f.tell(), self.LARGE + 3)
295 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
296 self.assertEqual(f.truncate(), self.LARGE + 2)
297 self.assertEqual(f.tell(), self.LARGE + 2)
298 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
299 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000300 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000301 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
302 self.assertEqual(f.seek(-1, 2), self.LARGE)
303 self.assertEqual(f.read(2), b"x")
304
Antoine Pitrou19690592009-06-12 20:14:08 +0000305 def test_invalid_operations(self):
306 # Try writing on a file opened in read mode and vice-versa.
307 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000308 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000309 self.assertRaises(IOError, fp.read)
310 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000311 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 self.assertRaises(IOError, fp.write, b"blah")
313 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000314 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000315 self.assertRaises(IOError, fp.write, "blah")
316 self.assertRaises(IOError, fp.writelines, ["blah\n"])
317
Christian Heimes1a6387e2008-03-26 12:49:49 +0000318 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000319 with self.open(support.TESTFN, "wb", buffering=0) as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb", buffering=0) as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329
330 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 with self.open(support.TESTFN, "wb") as f:
332 self.assertEqual(f.readable(), False)
333 self.assertEqual(f.writable(), True)
334 self.assertEqual(f.seekable(), True)
335 self.write_ops(f)
336 with self.open(support.TESTFN, "rb") as f:
337 self.assertEqual(f.readable(), True)
338 self.assertEqual(f.writable(), False)
339 self.assertEqual(f.seekable(), True)
340 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000341
342 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 with self.open(support.TESTFN, "wb") as f:
344 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
345 with self.open(support.TESTFN, "rb") as f:
346 self.assertEqual(f.readline(), b"abc\n")
347 self.assertEqual(f.readline(10), b"def\n")
348 self.assertEqual(f.readline(2), b"xy")
349 self.assertEqual(f.readline(4), b"zzy\n")
350 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000351 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000352 self.assertRaises(TypeError, f.readline, 5.3)
353 with self.open(support.TESTFN, "r") as f:
354 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355
356 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000357 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000358 self.write_ops(f)
359 data = f.getvalue()
360 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000361 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000362 self.read_ops(f, True)
363
364 def test_large_file_ops(self):
365 # On Windows and Mac OSX this test comsumes large resources; It takes
366 # a long time to build the >2GB file and takes >2GB of disk space
367 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
369 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000370 print("\nTesting large file ops skipped on %s." % sys.platform,
371 file=sys.stderr)
372 print("It requires %d bytes and a long time." % self.LARGE,
373 file=sys.stderr)
374 print("Use 'regrtest.py -u largefile test_io' to run it.",
375 file=sys.stderr)
376 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 with self.open(support.TESTFN, "w+b", 0) as f:
378 self.large_file_ops(f)
379 with self.open(support.TESTFN, "w+b") as f:
380 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000381
382 def test_with_open(self):
383 for bufsize in (0, 1, 100):
384 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000385 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 f.write(b"xxx")
387 self.assertEqual(f.closed, True)
388 f = None
389 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000390 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000391 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000392 except ZeroDivisionError:
393 self.assertEqual(f.closed, True)
394 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000395 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000396
Antoine Pitroue741cc62009-01-21 00:45:36 +0000397 # issue 5008
398 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000399 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000401 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000402 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000404 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000405 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000406 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000407
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 def test_destructor(self):
409 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000410 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000411 def __del__(self):
412 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000413 try:
414 f = super(MyFileIO, self).__del__
415 except AttributeError:
416 pass
417 else:
418 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419 def close(self):
420 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000422 def flush(self):
423 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 super(MyFileIO, self).flush()
425 f = MyFileIO(support.TESTFN, "wb")
426 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 support.gc_collect()
429 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000430 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 self.assertEqual(f.read(), b"xxx")
432
433 def _check_base_destructor(self, base):
434 record = []
435 class MyIO(base):
436 def __init__(self):
437 # This exercises the availability of attributes on object
438 # destruction.
439 # (in the C version, close() is called by the tp_dealloc
440 # function, not by __del__)
441 self.on_del = 1
442 self.on_close = 2
443 self.on_flush = 3
444 def __del__(self):
445 record.append(self.on_del)
446 try:
447 f = super(MyIO, self).__del__
448 except AttributeError:
449 pass
450 else:
451 f()
452 def close(self):
453 record.append(self.on_close)
454 super(MyIO, self).close()
455 def flush(self):
456 record.append(self.on_flush)
457 super(MyIO, self).flush()
458 f = MyIO()
459 del f
460 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000461 self.assertEqual(record, [1, 2, 3])
462
Antoine Pitrou19690592009-06-12 20:14:08 +0000463 def test_IOBase_destructor(self):
464 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000465
Antoine Pitrou19690592009-06-12 20:14:08 +0000466 def test_RawIOBase_destructor(self):
467 self._check_base_destructor(self.RawIOBase)
468
469 def test_BufferedIOBase_destructor(self):
470 self._check_base_destructor(self.BufferedIOBase)
471
472 def test_TextIOBase_destructor(self):
473 self._check_base_destructor(self.TextIOBase)
474
475 def test_close_flushes(self):
476 with self.open(support.TESTFN, "wb") as f:
477 f.write(b"xxx")
478 with self.open(support.TESTFN, "rb") as f:
479 self.assertEqual(f.read(), b"xxx")
480
481 def test_array_writes(self):
482 a = array.array(b'i', range(10))
483 n = len(a.tostring())
484 with self.open(support.TESTFN, "wb", 0) as f:
485 self.assertEqual(f.write(a), n)
486 with self.open(support.TESTFN, "wb") as f:
487 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
489 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000490 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000491 closefd=False)
492
Antoine Pitrou19690592009-06-12 20:14:08 +0000493 def test_read_closed(self):
494 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000495 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000496 with self.open(support.TESTFN, "r") as f:
497 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000498 self.assertEqual(file.read(), "egg\n")
499 file.seek(0)
500 file.close()
501 self.assertRaises(ValueError, file.read)
502
503 def test_no_closefd_with_filename(self):
504 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000505 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000506
507 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000509 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000510 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000513 self.assertEqual(file.buffer.raw.closefd, False)
514
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 def test_garbage_collection(self):
516 # FileIO objects are collected, and collecting them flushes
517 # all data to disk.
518 f = self.FileIO(support.TESTFN, "wb")
519 f.write(b"abcxxx")
520 f.f = f
521 wr = weakref.ref(f)
522 del f
523 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000524 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000525 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 def test_unbounded_file(self):
529 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
530 zero = "/dev/zero"
531 if not os.path.exists(zero):
532 self.skipTest("{0} does not exist".format(zero))
533 if sys.maxsize > 0x7FFFFFFF:
534 self.skipTest("test can only run in a 32-bit address space")
535 if support.real_max_memuse < support._2G:
536 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000537 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000539 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000543
Antoine Pitrou19690592009-06-12 20:14:08 +0000544class CIOTest(IOTest):
545 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000546
Antoine Pitrou19690592009-06-12 20:14:08 +0000547class PyIOTest(IOTest):
548 test_array_writes = unittest.skip(
549 "len(array.array) returns number of elements rather than bytelength"
550 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000551
552
Antoine Pitrou19690592009-06-12 20:14:08 +0000553class CommonBufferedTests:
554 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
555
556 def test_detach(self):
557 raw = self.MockRawIO()
558 buf = self.tp(raw)
559 self.assertIs(buf.detach(), raw)
560 self.assertRaises(ValueError, buf.detach)
561
562 def test_fileno(self):
563 rawio = self.MockRawIO()
564 bufio = self.tp(rawio)
565
566 self.assertEquals(42, bufio.fileno())
567
568 def test_no_fileno(self):
569 # XXX will we always have fileno() function? If so, kill
570 # this test. Else, write it.
571 pass
572
573 def test_invalid_args(self):
574 rawio = self.MockRawIO()
575 bufio = self.tp(rawio)
576 # Invalid whence
577 self.assertRaises(ValueError, bufio.seek, 0, -1)
578 self.assertRaises(ValueError, bufio.seek, 0, 3)
579
580 def test_override_destructor(self):
581 tp = self.tp
582 record = []
583 class MyBufferedIO(tp):
584 def __del__(self):
585 record.append(1)
586 try:
587 f = super(MyBufferedIO, self).__del__
588 except AttributeError:
589 pass
590 else:
591 f()
592 def close(self):
593 record.append(2)
594 super(MyBufferedIO, self).close()
595 def flush(self):
596 record.append(3)
597 super(MyBufferedIO, self).flush()
598 rawio = self.MockRawIO()
599 bufio = MyBufferedIO(rawio)
600 writable = bufio.writable()
601 del bufio
602 support.gc_collect()
603 if writable:
604 self.assertEqual(record, [1, 2, 3])
605 else:
606 self.assertEqual(record, [1, 2])
607
608 def test_context_manager(self):
609 # Test usability as a context manager
610 rawio = self.MockRawIO()
611 bufio = self.tp(rawio)
612 def _with():
613 with bufio:
614 pass
615 _with()
616 # bufio should now be closed, and using it a second time should raise
617 # a ValueError.
618 self.assertRaises(ValueError, _with)
619
620 def test_error_through_destructor(self):
621 # Test that the exception state is not modified by a destructor,
622 # even if close() fails.
623 rawio = self.CloseFailureIO()
624 def f():
625 self.tp(rawio).xyzzy
626 with support.captured_output("stderr") as s:
627 self.assertRaises(AttributeError, f)
628 s = s.getvalue().strip()
629 if s:
630 # The destructor *may* have printed an unraisable error, check it
631 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000632 self.assertTrue(s.startswith("Exception IOError: "), s)
633 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000634
635 def test_repr(self):
636 raw = self.MockRawIO()
637 b = self.tp(raw)
638 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
639 self.assertEqual(repr(b), "<%s>" % clsname)
640 raw.name = "dummy"
641 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
642 raw.name = b"dummy"
643 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000644
645
Antoine Pitrou19690592009-06-12 20:14:08 +0000646class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
647 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000648
Antoine Pitrou19690592009-06-12 20:14:08 +0000649 def test_constructor(self):
650 rawio = self.MockRawIO([b"abc"])
651 bufio = self.tp(rawio)
652 bufio.__init__(rawio)
653 bufio.__init__(rawio, buffer_size=1024)
654 bufio.__init__(rawio, buffer_size=16)
655 self.assertEquals(b"abc", bufio.read())
656 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
657 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
658 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
659 rawio = self.MockRawIO([b"abc"])
660 bufio.__init__(rawio)
661 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000662
Antoine Pitrou19690592009-06-12 20:14:08 +0000663 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000664 for arg in (None, 7):
665 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
666 bufio = self.tp(rawio)
667 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000668 # Invalid args
669 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000670
Antoine Pitrou19690592009-06-12 20:14:08 +0000671 def test_read1(self):
672 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
673 bufio = self.tp(rawio)
674 self.assertEquals(b"a", bufio.read(1))
675 self.assertEquals(b"b", bufio.read1(1))
676 self.assertEquals(rawio._reads, 1)
677 self.assertEquals(b"c", bufio.read1(100))
678 self.assertEquals(rawio._reads, 1)
679 self.assertEquals(b"d", bufio.read1(100))
680 self.assertEquals(rawio._reads, 2)
681 self.assertEquals(b"efg", bufio.read1(100))
682 self.assertEquals(rawio._reads, 3)
683 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000684 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000685 # Invalid args
686 self.assertRaises(ValueError, bufio.read1, -1)
687
688 def test_readinto(self):
689 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
690 bufio = self.tp(rawio)
691 b = bytearray(2)
692 self.assertEquals(bufio.readinto(b), 2)
693 self.assertEquals(b, b"ab")
694 self.assertEquals(bufio.readinto(b), 2)
695 self.assertEquals(b, b"cd")
696 self.assertEquals(bufio.readinto(b), 2)
697 self.assertEquals(b, b"ef")
698 self.assertEquals(bufio.readinto(b), 1)
699 self.assertEquals(b, b"gf")
700 self.assertEquals(bufio.readinto(b), 0)
701 self.assertEquals(b, b"gf")
702
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000703 def test_readlines(self):
704 def bufio():
705 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
706 return self.tp(rawio)
707 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
708 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
709 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
710
Antoine Pitrou19690592009-06-12 20:14:08 +0000711 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000712 data = b"abcdefghi"
713 dlen = len(data)
714
715 tests = [
716 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
717 [ 100, [ 3, 3, 3], [ dlen ] ],
718 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
719 ]
720
721 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000722 rawio = self.MockFileIO(data)
723 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000724 pos = 0
725 for nbytes in buf_read_sizes:
726 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
727 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000728 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000729 self.assertEquals(rawio.read_history, raw_read_sizes)
730
Antoine Pitrou19690592009-06-12 20:14:08 +0000731 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000732 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000733 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
734 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000735
736 self.assertEquals(b"abcd", bufio.read(6))
737 self.assertEquals(b"e", bufio.read(1))
738 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000739 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000740 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000741 self.assertEquals(b"", bufio.read())
742
Antoine Pitrou19690592009-06-12 20:14:08 +0000743 def test_read_past_eof(self):
744 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
745 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000746
747 self.assertEquals(b"abcdefg", bufio.read(9000))
748
Antoine Pitrou19690592009-06-12 20:14:08 +0000749 def test_read_all(self):
750 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
751 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000752
753 self.assertEquals(b"abcdefg", bufio.read())
754
Victor Stinner6a102812010-04-27 23:55:59 +0000755 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +0000756 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000757 try:
758 # Write out many bytes with exactly the same number of 0's,
759 # 1's... 255's. This will help us check that concurrent reading
760 # doesn't duplicate or forget contents.
761 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000762 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000763 random.shuffle(l)
764 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000765 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000766 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000767 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000768 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000769 errors = []
770 results = []
771 def f():
772 try:
773 # Intra-buffer read then buffer-flushing read
774 for n in cycle([1, 19]):
775 s = bufio.read(n)
776 if not s:
777 break
778 # list.append() is atomic
779 results.append(s)
780 except Exception as e:
781 errors.append(e)
782 raise
783 threads = [threading.Thread(target=f) for x in range(20)]
784 for t in threads:
785 t.start()
786 time.sleep(0.02) # yield
787 for t in threads:
788 t.join()
789 self.assertFalse(errors,
790 "the following exceptions were caught: %r" % errors)
791 s = b''.join(results)
792 for i in range(256):
793 c = bytes(bytearray([i]))
794 self.assertEqual(s.count(c), N)
795 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000796 support.unlink(support.TESTFN)
797
798 def test_misbehaved_io(self):
799 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
800 bufio = self.tp(rawio)
801 self.assertRaises(IOError, bufio.seek, 0)
802 self.assertRaises(IOError, bufio.tell)
803
804class CBufferedReaderTest(BufferedReaderTest):
805 tp = io.BufferedReader
806
807 def test_constructor(self):
808 BufferedReaderTest.test_constructor(self)
809 # The allocation can succeed on 32-bit builds, e.g. with more
810 # than 2GB RAM and a 64-bit kernel.
811 if sys.maxsize > 0x7FFFFFFF:
812 rawio = self.MockRawIO()
813 bufio = self.tp(rawio)
814 self.assertRaises((OverflowError, MemoryError, ValueError),
815 bufio.__init__, rawio, sys.maxsize)
816
817 def test_initialization(self):
818 rawio = self.MockRawIO([b"abc"])
819 bufio = self.tp(rawio)
820 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
821 self.assertRaises(ValueError, bufio.read)
822 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
823 self.assertRaises(ValueError, bufio.read)
824 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
825 self.assertRaises(ValueError, bufio.read)
826
827 def test_misbehaved_io_read(self):
828 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
829 bufio = self.tp(rawio)
830 # _pyio.BufferedReader seems to implement reading different, so that
831 # checking this is not so easy.
832 self.assertRaises(IOError, bufio.read, 10)
833
834 def test_garbage_collection(self):
835 # C BufferedReader objects are collected.
836 # The Python version has __del__, so it ends into gc.garbage instead
837 rawio = self.FileIO(support.TESTFN, "w+b")
838 f = self.tp(rawio)
839 f.f = f
840 wr = weakref.ref(f)
841 del f
842 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000843 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000844
845class PyBufferedReaderTest(BufferedReaderTest):
846 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000847
848
Antoine Pitrou19690592009-06-12 20:14:08 +0000849class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
850 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000851
Antoine Pitrou19690592009-06-12 20:14:08 +0000852 def test_constructor(self):
853 rawio = self.MockRawIO()
854 bufio = self.tp(rawio)
855 bufio.__init__(rawio)
856 bufio.__init__(rawio, buffer_size=1024)
857 bufio.__init__(rawio, buffer_size=16)
858 self.assertEquals(3, bufio.write(b"abc"))
859 bufio.flush()
860 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
861 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
862 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
863 bufio.__init__(rawio)
864 self.assertEquals(3, bufio.write(b"ghi"))
865 bufio.flush()
866 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000867
Antoine Pitrou19690592009-06-12 20:14:08 +0000868 def test_detach_flush(self):
869 raw = self.MockRawIO()
870 buf = self.tp(raw)
871 buf.write(b"howdy!")
872 self.assertFalse(raw._write_stack)
873 buf.detach()
874 self.assertEqual(raw._write_stack, [b"howdy!"])
875
876 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000877 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000878 writer = self.MockRawIO()
879 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000880 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000881 self.assertFalse(writer._write_stack)
882
Antoine Pitrou19690592009-06-12 20:14:08 +0000883 def test_write_overflow(self):
884 writer = self.MockRawIO()
885 bufio = self.tp(writer, 8)
886 contents = b"abcdefghijklmnop"
887 for n in range(0, len(contents), 3):
888 bufio.write(contents[n:n+3])
889 flushed = b"".join(writer._write_stack)
890 # At least (total - 8) bytes were implicitly flushed, perhaps more
891 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000892 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000893
Antoine Pitrou19690592009-06-12 20:14:08 +0000894 def check_writes(self, intermediate_func):
895 # Lots of writes, test the flushed output is as expected.
896 contents = bytes(range(256)) * 1000
897 n = 0
898 writer = self.MockRawIO()
899 bufio = self.tp(writer, 13)
900 # Generator of write sizes: repeat each N 15 times then proceed to N+1
901 def gen_sizes():
902 for size in count(1):
903 for i in range(15):
904 yield size
905 sizes = gen_sizes()
906 while n < len(contents):
907 size = min(next(sizes), len(contents) - n)
908 self.assertEquals(bufio.write(contents[n:n+size]), size)
909 intermediate_func(bufio)
910 n += size
911 bufio.flush()
912 self.assertEquals(contents,
913 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000914
Antoine Pitrou19690592009-06-12 20:14:08 +0000915 def test_writes(self):
916 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000917
Antoine Pitrou19690592009-06-12 20:14:08 +0000918 def test_writes_and_flushes(self):
919 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000920
Antoine Pitrou19690592009-06-12 20:14:08 +0000921 def test_writes_and_seeks(self):
922 def _seekabs(bufio):
923 pos = bufio.tell()
924 bufio.seek(pos + 1, 0)
925 bufio.seek(pos - 1, 0)
926 bufio.seek(pos, 0)
927 self.check_writes(_seekabs)
928 def _seekrel(bufio):
929 pos = bufio.seek(0, 1)
930 bufio.seek(+1, 1)
931 bufio.seek(-1, 1)
932 bufio.seek(pos, 0)
933 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000934
Antoine Pitrou19690592009-06-12 20:14:08 +0000935 def test_writes_and_truncates(self):
936 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000937
Antoine Pitrou19690592009-06-12 20:14:08 +0000938 def test_write_non_blocking(self):
939 raw = self.MockNonBlockWriterIO()
940 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000941
Antoine Pitrou19690592009-06-12 20:14:08 +0000942 self.assertEquals(bufio.write(b"abcd"), 4)
943 self.assertEquals(bufio.write(b"efghi"), 5)
944 # 1 byte will be written, the rest will be buffered
945 raw.block_on(b"k")
946 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947
Antoine Pitrou19690592009-06-12 20:14:08 +0000948 # 8 bytes will be written, 8 will be buffered and the rest will be lost
949 raw.block_on(b"0")
950 try:
951 bufio.write(b"opqrwxyz0123456789")
952 except self.BlockingIOError as e:
953 written = e.characters_written
954 else:
955 self.fail("BlockingIOError should have been raised")
956 self.assertEquals(written, 16)
957 self.assertEquals(raw.pop_written(),
958 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000959
Antoine Pitrou19690592009-06-12 20:14:08 +0000960 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
961 s = raw.pop_written()
962 # Previously buffered bytes were flushed
963 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000964
Antoine Pitrou19690592009-06-12 20:14:08 +0000965 def test_write_and_rewind(self):
966 raw = io.BytesIO()
967 bufio = self.tp(raw, 4)
968 self.assertEqual(bufio.write(b"abcdef"), 6)
969 self.assertEqual(bufio.tell(), 6)
970 bufio.seek(0, 0)
971 self.assertEqual(bufio.write(b"XY"), 2)
972 bufio.seek(6, 0)
973 self.assertEqual(raw.getvalue(), b"XYcdef")
974 self.assertEqual(bufio.write(b"123456"), 6)
975 bufio.flush()
976 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000977
Antoine Pitrou19690592009-06-12 20:14:08 +0000978 def test_flush(self):
979 writer = self.MockRawIO()
980 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000981 bufio.write(b"abc")
982 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000983 self.assertEquals(b"abc", writer._write_stack[0])
984
Antoine Pitrou19690592009-06-12 20:14:08 +0000985 def test_destructor(self):
986 writer = self.MockRawIO()
987 bufio = self.tp(writer, 8)
988 bufio.write(b"abc")
989 del bufio
990 support.gc_collect()
991 self.assertEquals(b"abc", writer._write_stack[0])
992
993 def test_truncate(self):
994 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000995 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000996 bufio = self.tp(raw, 8)
997 bufio.write(b"abcdef")
998 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000999 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001000 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001001 self.assertEqual(f.read(), b"abc")
1002
Victor Stinner6a102812010-04-27 23:55:59 +00001003 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +00001004 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001005 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001006 # Write out many bytes from many threads and test they were
1007 # all flushed.
1008 N = 1000
1009 contents = bytes(range(256)) * N
1010 sizes = cycle([1, 19])
1011 n = 0
1012 queue = deque()
1013 while n < len(contents):
1014 size = next(sizes)
1015 queue.append(contents[n:n+size])
1016 n += size
1017 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001018 # We use a real file object because it allows us to
1019 # exercise situations where the GIL is released before
1020 # writing the buffer to the raw streams. This is in addition
1021 # to concurrency issues due to switching threads in the middle
1022 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001023 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001024 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001025 errors = []
1026 def f():
1027 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001028 while True:
1029 try:
1030 s = queue.popleft()
1031 except IndexError:
1032 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001033 bufio.write(s)
1034 except Exception as e:
1035 errors.append(e)
1036 raise
1037 threads = [threading.Thread(target=f) for x in range(20)]
1038 for t in threads:
1039 t.start()
1040 time.sleep(0.02) # yield
1041 for t in threads:
1042 t.join()
1043 self.assertFalse(errors,
1044 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001045 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001046 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001047 s = f.read()
1048 for i in range(256):
1049 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001050 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001051 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001052
Antoine Pitrou19690592009-06-12 20:14:08 +00001053 def test_misbehaved_io(self):
1054 rawio = self.MisbehavedRawIO()
1055 bufio = self.tp(rawio, 5)
1056 self.assertRaises(IOError, bufio.seek, 0)
1057 self.assertRaises(IOError, bufio.tell)
1058 self.assertRaises(IOError, bufio.write, b"abcdef")
1059
1060 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001061 with support.check_warnings(("max_buffer_size is deprecated",
1062 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001063 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001064
1065
1066class CBufferedWriterTest(BufferedWriterTest):
1067 tp = io.BufferedWriter
1068
1069 def test_constructor(self):
1070 BufferedWriterTest.test_constructor(self)
1071 # The allocation can succeed on 32-bit builds, e.g. with more
1072 # than 2GB RAM and a 64-bit kernel.
1073 if sys.maxsize > 0x7FFFFFFF:
1074 rawio = self.MockRawIO()
1075 bufio = self.tp(rawio)
1076 self.assertRaises((OverflowError, MemoryError, ValueError),
1077 bufio.__init__, rawio, sys.maxsize)
1078
1079 def test_initialization(self):
1080 rawio = self.MockRawIO()
1081 bufio = self.tp(rawio)
1082 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1083 self.assertRaises(ValueError, bufio.write, b"def")
1084 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1085 self.assertRaises(ValueError, bufio.write, b"def")
1086 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1087 self.assertRaises(ValueError, bufio.write, b"def")
1088
1089 def test_garbage_collection(self):
1090 # C BufferedWriter objects are collected, and collecting them flushes
1091 # all data to disk.
1092 # The Python version has __del__, so it ends into gc.garbage instead
1093 rawio = self.FileIO(support.TESTFN, "w+b")
1094 f = self.tp(rawio)
1095 f.write(b"123xxx")
1096 f.x = f
1097 wr = weakref.ref(f)
1098 del f
1099 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001100 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001101 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001102 self.assertEqual(f.read(), b"123xxx")
1103
1104
1105class PyBufferedWriterTest(BufferedWriterTest):
1106 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001107
1108class BufferedRWPairTest(unittest.TestCase):
1109
Antoine Pitrou19690592009-06-12 20:14:08 +00001110 def test_constructor(self):
1111 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001112 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001113
Antoine Pitrou19690592009-06-12 20:14:08 +00001114 def test_detach(self):
1115 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1116 self.assertRaises(self.UnsupportedOperation, pair.detach)
1117
1118 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001119 with support.check_warnings(("max_buffer_size is deprecated",
1120 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001121 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001122
1123 def test_constructor_with_not_readable(self):
1124 class NotReadable(MockRawIO):
1125 def readable(self):
1126 return False
1127
1128 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1129
1130 def test_constructor_with_not_writeable(self):
1131 class NotWriteable(MockRawIO):
1132 def writable(self):
1133 return False
1134
1135 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1136
1137 def test_read(self):
1138 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1139
1140 self.assertEqual(pair.read(3), b"abc")
1141 self.assertEqual(pair.read(1), b"d")
1142 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001143 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1144 self.assertEqual(pair.read(None), b"abc")
1145
1146 def test_readlines(self):
1147 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1148 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1149 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1150 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001151
1152 def test_read1(self):
1153 # .read1() is delegated to the underlying reader object, so this test
1154 # can be shallow.
1155 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1156
1157 self.assertEqual(pair.read1(3), b"abc")
1158
1159 def test_readinto(self):
1160 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1161
1162 data = bytearray(5)
1163 self.assertEqual(pair.readinto(data), 5)
1164 self.assertEqual(data, b"abcde")
1165
1166 def test_write(self):
1167 w = self.MockRawIO()
1168 pair = self.tp(self.MockRawIO(), w)
1169
1170 pair.write(b"abc")
1171 pair.flush()
1172 pair.write(b"def")
1173 pair.flush()
1174 self.assertEqual(w._write_stack, [b"abc", b"def"])
1175
1176 def test_peek(self):
1177 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1178
1179 self.assertTrue(pair.peek(3).startswith(b"abc"))
1180 self.assertEqual(pair.read(3), b"abc")
1181
1182 def test_readable(self):
1183 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1184 self.assertTrue(pair.readable())
1185
1186 def test_writeable(self):
1187 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1188 self.assertTrue(pair.writable())
1189
1190 def test_seekable(self):
1191 # BufferedRWPairs are never seekable, even if their readers and writers
1192 # are.
1193 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1194 self.assertFalse(pair.seekable())
1195
1196 # .flush() is delegated to the underlying writer object and has been
1197 # tested in the test_write method.
1198
1199 def test_close_and_closed(self):
1200 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1201 self.assertFalse(pair.closed)
1202 pair.close()
1203 self.assertTrue(pair.closed)
1204
1205 def test_isatty(self):
1206 class SelectableIsAtty(MockRawIO):
1207 def __init__(self, isatty):
1208 MockRawIO.__init__(self)
1209 self._isatty = isatty
1210
1211 def isatty(self):
1212 return self._isatty
1213
1214 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1215 self.assertFalse(pair.isatty())
1216
1217 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1218 self.assertTrue(pair.isatty())
1219
1220 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1221 self.assertTrue(pair.isatty())
1222
1223 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1224 self.assertTrue(pair.isatty())
1225
1226class CBufferedRWPairTest(BufferedRWPairTest):
1227 tp = io.BufferedRWPair
1228
1229class PyBufferedRWPairTest(BufferedRWPairTest):
1230 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001231
1232
Antoine Pitrou19690592009-06-12 20:14:08 +00001233class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1234 read_mode = "rb+"
1235 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001236
Antoine Pitrou19690592009-06-12 20:14:08 +00001237 def test_constructor(self):
1238 BufferedReaderTest.test_constructor(self)
1239 BufferedWriterTest.test_constructor(self)
1240
1241 def test_read_and_write(self):
1242 raw = self.MockRawIO((b"asdf", b"ghjk"))
1243 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001244
1245 self.assertEqual(b"as", rw.read(2))
1246 rw.write(b"ddd")
1247 rw.write(b"eee")
1248 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001249 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001250 self.assertEquals(b"dddeee", raw._write_stack[0])
1251
Antoine Pitrou19690592009-06-12 20:14:08 +00001252 def test_seek_and_tell(self):
1253 raw = self.BytesIO(b"asdfghjkl")
1254 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001255
1256 self.assertEquals(b"as", rw.read(2))
1257 self.assertEquals(2, rw.tell())
1258 rw.seek(0, 0)
1259 self.assertEquals(b"asdf", rw.read(4))
1260
1261 rw.write(b"asdf")
1262 rw.seek(0, 0)
1263 self.assertEquals(b"asdfasdfl", rw.read())
1264 self.assertEquals(9, rw.tell())
1265 rw.seek(-4, 2)
1266 self.assertEquals(5, rw.tell())
1267 rw.seek(2, 1)
1268 self.assertEquals(7, rw.tell())
1269 self.assertEquals(b"fl", rw.read(11))
1270 self.assertRaises(TypeError, rw.seek, 0.0)
1271
Antoine Pitrou19690592009-06-12 20:14:08 +00001272 def check_flush_and_read(self, read_func):
1273 raw = self.BytesIO(b"abcdefghi")
1274 bufio = self.tp(raw)
1275
1276 self.assertEquals(b"ab", read_func(bufio, 2))
1277 bufio.write(b"12")
1278 self.assertEquals(b"ef", read_func(bufio, 2))
1279 self.assertEquals(6, bufio.tell())
1280 bufio.flush()
1281 self.assertEquals(6, bufio.tell())
1282 self.assertEquals(b"ghi", read_func(bufio))
1283 raw.seek(0, 0)
1284 raw.write(b"XYZ")
1285 # flush() resets the read buffer
1286 bufio.flush()
1287 bufio.seek(0, 0)
1288 self.assertEquals(b"XYZ", read_func(bufio, 3))
1289
1290 def test_flush_and_read(self):
1291 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1292
1293 def test_flush_and_readinto(self):
1294 def _readinto(bufio, n=-1):
1295 b = bytearray(n if n >= 0 else 9999)
1296 n = bufio.readinto(b)
1297 return bytes(b[:n])
1298 self.check_flush_and_read(_readinto)
1299
1300 def test_flush_and_peek(self):
1301 def _peek(bufio, n=-1):
1302 # This relies on the fact that the buffer can contain the whole
1303 # raw stream, otherwise peek() can return less.
1304 b = bufio.peek(n)
1305 if n != -1:
1306 b = b[:n]
1307 bufio.seek(len(b), 1)
1308 return b
1309 self.check_flush_and_read(_peek)
1310
1311 def test_flush_and_write(self):
1312 raw = self.BytesIO(b"abcdefghi")
1313 bufio = self.tp(raw)
1314
1315 bufio.write(b"123")
1316 bufio.flush()
1317 bufio.write(b"45")
1318 bufio.flush()
1319 bufio.seek(0, 0)
1320 self.assertEquals(b"12345fghi", raw.getvalue())
1321 self.assertEquals(b"12345fghi", bufio.read())
1322
1323 def test_threads(self):
1324 BufferedReaderTest.test_threads(self)
1325 BufferedWriterTest.test_threads(self)
1326
1327 def test_writes_and_peek(self):
1328 def _peek(bufio):
1329 bufio.peek(1)
1330 self.check_writes(_peek)
1331 def _peek(bufio):
1332 pos = bufio.tell()
1333 bufio.seek(-1, 1)
1334 bufio.peek(1)
1335 bufio.seek(pos, 0)
1336 self.check_writes(_peek)
1337
1338 def test_writes_and_reads(self):
1339 def _read(bufio):
1340 bufio.seek(-1, 1)
1341 bufio.read(1)
1342 self.check_writes(_read)
1343
1344 def test_writes_and_read1s(self):
1345 def _read1(bufio):
1346 bufio.seek(-1, 1)
1347 bufio.read1(1)
1348 self.check_writes(_read1)
1349
1350 def test_writes_and_readintos(self):
1351 def _read(bufio):
1352 bufio.seek(-1, 1)
1353 bufio.readinto(bytearray(1))
1354 self.check_writes(_read)
1355
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001356 def test_write_after_readahead(self):
1357 # Issue #6629: writing after the buffer was filled by readahead should
1358 # first rewind the raw stream.
1359 for overwrite_size in [1, 5]:
1360 raw = self.BytesIO(b"A" * 10)
1361 bufio = self.tp(raw, 4)
1362 # Trigger readahead
1363 self.assertEqual(bufio.read(1), b"A")
1364 self.assertEqual(bufio.tell(), 1)
1365 # Overwriting should rewind the raw stream if it needs so
1366 bufio.write(b"B" * overwrite_size)
1367 self.assertEqual(bufio.tell(), overwrite_size + 1)
1368 # If the write size was smaller than the buffer size, flush() and
1369 # check that rewind happens.
1370 bufio.flush()
1371 self.assertEqual(bufio.tell(), overwrite_size + 1)
1372 s = raw.getvalue()
1373 self.assertEqual(s,
1374 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1375
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001376 def test_truncate_after_read_or_write(self):
1377 raw = self.BytesIO(b"A" * 10)
1378 bufio = self.tp(raw, 100)
1379 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1380 self.assertEqual(bufio.truncate(), 2)
1381 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1382 self.assertEqual(bufio.truncate(), 4)
1383
Antoine Pitrou19690592009-06-12 20:14:08 +00001384 def test_misbehaved_io(self):
1385 BufferedReaderTest.test_misbehaved_io(self)
1386 BufferedWriterTest.test_misbehaved_io(self)
1387
1388class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1389 tp = io.BufferedRandom
1390
1391 def test_constructor(self):
1392 BufferedRandomTest.test_constructor(self)
1393 # The allocation can succeed on 32-bit builds, e.g. with more
1394 # than 2GB RAM and a 64-bit kernel.
1395 if sys.maxsize > 0x7FFFFFFF:
1396 rawio = self.MockRawIO()
1397 bufio = self.tp(rawio)
1398 self.assertRaises((OverflowError, MemoryError, ValueError),
1399 bufio.__init__, rawio, sys.maxsize)
1400
1401 def test_garbage_collection(self):
1402 CBufferedReaderTest.test_garbage_collection(self)
1403 CBufferedWriterTest.test_garbage_collection(self)
1404
1405class PyBufferedRandomTest(BufferedRandomTest):
1406 tp = pyio.BufferedRandom
1407
1408
Christian Heimes1a6387e2008-03-26 12:49:49 +00001409# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1410# properties:
1411# - A single output character can correspond to many bytes of input.
1412# - The number of input bytes to complete the character can be
1413# undetermined until the last input byte is received.
1414# - The number of input bytes can vary depending on previous input.
1415# - A single input byte can correspond to many characters of output.
1416# - The number of output characters can be undetermined until the
1417# last input byte is received.
1418# - The number of output characters can vary depending on previous input.
1419
1420class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1421 """
1422 For testing seek/tell behavior with a stateful, buffering decoder.
1423
1424 Input is a sequence of words. Words may be fixed-length (length set
1425 by input) or variable-length (period-terminated). In variable-length
1426 mode, extra periods are ignored. Possible words are:
1427 - 'i' followed by a number sets the input length, I (maximum 99).
1428 When I is set to 0, words are space-terminated.
1429 - 'o' followed by a number sets the output length, O (maximum 99).
1430 - Any other word is converted into a word followed by a period on
1431 the output. The output word consists of the input word truncated
1432 or padded out with hyphens to make its length equal to O. If O
1433 is 0, the word is output verbatim without truncating or padding.
1434 I and O are initially set to 1. When I changes, any buffered input is
1435 re-scanned according to the new I. EOF also terminates the last word.
1436 """
1437
1438 def __init__(self, errors='strict'):
1439 codecs.IncrementalDecoder.__init__(self, errors)
1440 self.reset()
1441
1442 def __repr__(self):
1443 return '<SID %x>' % id(self)
1444
1445 def reset(self):
1446 self.i = 1
1447 self.o = 1
1448 self.buffer = bytearray()
1449
1450 def getstate(self):
1451 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1452 return bytes(self.buffer), i*100 + o
1453
1454 def setstate(self, state):
1455 buffer, io = state
1456 self.buffer = bytearray(buffer)
1457 i, o = divmod(io, 100)
1458 self.i, self.o = i ^ 1, o ^ 1
1459
1460 def decode(self, input, final=False):
1461 output = ''
1462 for b in input:
1463 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001464 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001465 if self.buffer:
1466 output += self.process_word()
1467 else:
1468 self.buffer.append(b)
1469 else: # fixed-length, terminate after self.i bytes
1470 self.buffer.append(b)
1471 if len(self.buffer) == self.i:
1472 output += self.process_word()
1473 if final and self.buffer: # EOF terminates the last word
1474 output += self.process_word()
1475 return output
1476
1477 def process_word(self):
1478 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001479 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001480 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001481 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001482 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1483 else:
1484 output = self.buffer.decode('ascii')
1485 if len(output) < self.o:
1486 output += '-'*self.o # pad out with hyphens
1487 if self.o:
1488 output = output[:self.o] # truncate to output length
1489 output += '.'
1490 self.buffer = bytearray()
1491 return output
1492
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001493 codecEnabled = False
1494
1495 @classmethod
1496 def lookupTestDecoder(cls, name):
1497 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001498 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001499 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001500 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001501 incrementalencoder=None,
1502 streamreader=None, streamwriter=None,
1503 incrementaldecoder=cls)
1504
1505# Register the previous decoder for testing.
1506# Disabled by default, tests will enable it.
1507codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1508
1509
Christian Heimes1a6387e2008-03-26 12:49:49 +00001510class StatefulIncrementalDecoderTest(unittest.TestCase):
1511 """
1512 Make sure the StatefulIncrementalDecoder actually works.
1513 """
1514
1515 test_cases = [
1516 # I=1, O=1 (fixed-length input == fixed-length output)
1517 (b'abcd', False, 'a.b.c.d.'),
1518 # I=0, O=0 (variable-length input, variable-length output)
1519 (b'oiabcd', True, 'abcd.'),
1520 # I=0, O=0 (should ignore extra periods)
1521 (b'oi...abcd...', True, 'abcd.'),
1522 # I=0, O=6 (variable-length input, fixed-length output)
1523 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1524 # I=2, O=6 (fixed-length input < fixed-length output)
1525 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1526 # I=6, O=3 (fixed-length input > fixed-length output)
1527 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1528 # I=0, then 3; O=29, then 15 (with longer output)
1529 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1530 'a----------------------------.' +
1531 'b----------------------------.' +
1532 'cde--------------------------.' +
1533 'abcdefghijabcde.' +
1534 'a.b------------.' +
1535 '.c.------------.' +
1536 'd.e------------.' +
1537 'k--------------.' +
1538 'l--------------.' +
1539 'm--------------.')
1540 ]
1541
Antoine Pitrou19690592009-06-12 20:14:08 +00001542 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001543 # Try a few one-shot test cases.
1544 for input, eof, output in self.test_cases:
1545 d = StatefulIncrementalDecoder()
1546 self.assertEquals(d.decode(input, eof), output)
1547
1548 # Also test an unfinished decode, followed by forcing EOF.
1549 d = StatefulIncrementalDecoder()
1550 self.assertEquals(d.decode(b'oiabcd'), '')
1551 self.assertEquals(d.decode(b'', 1), 'abcd.')
1552
1553class TextIOWrapperTest(unittest.TestCase):
1554
1555 def setUp(self):
1556 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1557 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001558 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001559
1560 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001561 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001562
Antoine Pitrou19690592009-06-12 20:14:08 +00001563 def test_constructor(self):
1564 r = self.BytesIO(b"\xc3\xa9\n\n")
1565 b = self.BufferedReader(r, 1000)
1566 t = self.TextIOWrapper(b)
1567 t.__init__(b, encoding="latin1", newline="\r\n")
1568 self.assertEquals(t.encoding, "latin1")
1569 self.assertEquals(t.line_buffering, False)
1570 t.__init__(b, encoding="utf8", line_buffering=True)
1571 self.assertEquals(t.encoding, "utf8")
1572 self.assertEquals(t.line_buffering, True)
1573 self.assertEquals("\xe9\n", t.readline())
1574 self.assertRaises(TypeError, t.__init__, b, newline=42)
1575 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1576
1577 def test_detach(self):
1578 r = self.BytesIO()
1579 b = self.BufferedWriter(r)
1580 t = self.TextIOWrapper(b)
1581 self.assertIs(t.detach(), b)
1582
1583 t = self.TextIOWrapper(b, encoding="ascii")
1584 t.write("howdy")
1585 self.assertFalse(r.getvalue())
1586 t.detach()
1587 self.assertEqual(r.getvalue(), b"howdy")
1588 self.assertRaises(ValueError, t.detach)
1589
1590 def test_repr(self):
1591 raw = self.BytesIO("hello".encode("utf-8"))
1592 b = self.BufferedReader(raw)
1593 t = self.TextIOWrapper(b, encoding="utf-8")
1594 modname = self.TextIOWrapper.__module__
1595 self.assertEqual(repr(t),
1596 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1597 raw.name = "dummy"
1598 self.assertEqual(repr(t),
1599 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1600 raw.name = b"dummy"
1601 self.assertEqual(repr(t),
1602 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1603
1604 def test_line_buffering(self):
1605 r = self.BytesIO()
1606 b = self.BufferedWriter(r, 1000)
1607 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1608 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001609 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001610 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001611 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001612 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001613 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1614
Antoine Pitrou19690592009-06-12 20:14:08 +00001615 def test_encoding(self):
1616 # Check the encoding attribute is always set, and valid
1617 b = self.BytesIO()
1618 t = self.TextIOWrapper(b, encoding="utf8")
1619 self.assertEqual(t.encoding, "utf8")
1620 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001621 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001622 codecs.lookup(t.encoding)
1623
1624 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001625 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001626 b = self.BytesIO(b"abc\n\xff\n")
1627 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001628 self.assertRaises(UnicodeError, t.read)
1629 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001630 b = self.BytesIO(b"abc\n\xff\n")
1631 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001632 self.assertRaises(UnicodeError, t.read)
1633 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001634 b = self.BytesIO(b"abc\n\xff\n")
1635 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001636 self.assertEquals(t.read(), "abc\n\n")
1637 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001638 b = self.BytesIO(b"abc\n\xff\n")
1639 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1640 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641
Antoine Pitrou19690592009-06-12 20:14:08 +00001642 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001643 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001644 b = self.BytesIO()
1645 t = self.TextIOWrapper(b, encoding="ascii")
1646 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001647 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001648 b = self.BytesIO()
1649 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1650 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001651 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001652 b = self.BytesIO()
1653 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001654 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001655 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001656 t.flush()
1657 self.assertEquals(b.getvalue(), b"abcdef\n")
1658 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001659 b = self.BytesIO()
1660 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001661 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001662 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001663 t.flush()
1664 self.assertEquals(b.getvalue(), b"abc?def\n")
1665
Antoine Pitrou19690592009-06-12 20:14:08 +00001666 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001667 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1668
1669 tests = [
1670 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1671 [ '', input_lines ],
1672 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1673 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1674 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1675 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001676 encodings = (
1677 'utf-8', 'latin-1',
1678 'utf-16', 'utf-16-le', 'utf-16-be',
1679 'utf-32', 'utf-32-le', 'utf-32-be',
1680 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001681
1682 # Try a range of buffer sizes to test the case where \r is the last
1683 # character in TextIOWrapper._pending_line.
1684 for encoding in encodings:
1685 # XXX: str.encode() should return bytes
1686 data = bytes(''.join(input_lines).encode(encoding))
1687 for do_reads in (False, True):
1688 for bufsize in range(1, 10):
1689 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001690 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1691 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001692 encoding=encoding)
1693 if do_reads:
1694 got_lines = []
1695 while True:
1696 c2 = textio.read(2)
1697 if c2 == '':
1698 break
1699 self.assertEquals(len(c2), 2)
1700 got_lines.append(c2 + textio.readline())
1701 else:
1702 got_lines = list(textio)
1703
1704 for got_line, exp_line in zip(got_lines, exp_lines):
1705 self.assertEquals(got_line, exp_line)
1706 self.assertEquals(len(got_lines), len(exp_lines))
1707
Antoine Pitrou19690592009-06-12 20:14:08 +00001708 def test_newlines_input(self):
1709 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001710 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1711 for newline, expected in [
1712 (None, normalized.decode("ascii").splitlines(True)),
1713 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001714 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1715 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1716 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001717 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001718 buf = self.BytesIO(testdata)
1719 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001720 self.assertEquals(txt.readlines(), expected)
1721 txt.seek(0)
1722 self.assertEquals(txt.read(), "".join(expected))
1723
Antoine Pitrou19690592009-06-12 20:14:08 +00001724 def test_newlines_output(self):
1725 testdict = {
1726 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1727 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1728 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1729 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1730 }
1731 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1732 for newline, expected in tests:
1733 buf = self.BytesIO()
1734 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1735 txt.write("AAA\nB")
1736 txt.write("BB\nCCC\n")
1737 txt.write("X\rY\r\nZ")
1738 txt.flush()
1739 self.assertEquals(buf.closed, False)
1740 self.assertEquals(buf.getvalue(), expected)
1741
1742 def test_destructor(self):
1743 l = []
1744 base = self.BytesIO
1745 class MyBytesIO(base):
1746 def close(self):
1747 l.append(self.getvalue())
1748 base.close(self)
1749 b = MyBytesIO()
1750 t = self.TextIOWrapper(b, encoding="ascii")
1751 t.write("abc")
1752 del t
1753 support.gc_collect()
1754 self.assertEquals([b"abc"], l)
1755
1756 def test_override_destructor(self):
1757 record = []
1758 class MyTextIO(self.TextIOWrapper):
1759 def __del__(self):
1760 record.append(1)
1761 try:
1762 f = super(MyTextIO, self).__del__
1763 except AttributeError:
1764 pass
1765 else:
1766 f()
1767 def close(self):
1768 record.append(2)
1769 super(MyTextIO, self).close()
1770 def flush(self):
1771 record.append(3)
1772 super(MyTextIO, self).flush()
1773 b = self.BytesIO()
1774 t = MyTextIO(b, encoding="ascii")
1775 del t
1776 support.gc_collect()
1777 self.assertEqual(record, [1, 2, 3])
1778
1779 def test_error_through_destructor(self):
1780 # Test that the exception state is not modified by a destructor,
1781 # even if close() fails.
1782 rawio = self.CloseFailureIO()
1783 def f():
1784 self.TextIOWrapper(rawio).xyzzy
1785 with support.captured_output("stderr") as s:
1786 self.assertRaises(AttributeError, f)
1787 s = s.getvalue().strip()
1788 if s:
1789 # The destructor *may* have printed an unraisable error, check it
1790 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001791 self.assertTrue(s.startswith("Exception IOError: "), s)
1792 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001793
1794 # Systematic tests of the text I/O API
1795
Antoine Pitrou19690592009-06-12 20:14:08 +00001796 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001797 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1798 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001799 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001800 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001801 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001802 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001803 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001804 f._CHUNK_SIZE = chunksize
1805 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001806 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001807 cookie = f.tell()
1808 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001809 self.assertEquals(f.read(None), "abc")
1810 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001811 self.assertEquals(f.read(2), "ab")
1812 self.assertEquals(f.read(1), "c")
1813 self.assertEquals(f.read(1), "")
1814 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001815 self.assertEquals(f.tell(), cookie)
1816 self.assertEquals(f.seek(0), 0)
1817 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001818 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001819 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001820 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001821 if enc.startswith("utf"):
1822 self.multi_line_test(f, enc)
1823 f.close()
1824
1825 def multi_line_test(self, f, enc):
1826 f.seek(0)
1827 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001828 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001829 wlines = []
1830 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1831 chars = []
1832 for i in range(size):
1833 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001834 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001835 wlines.append((f.tell(), line))
1836 f.write(line)
1837 f.seek(0)
1838 rlines = []
1839 while True:
1840 pos = f.tell()
1841 line = f.readline()
1842 if not line:
1843 break
1844 rlines.append((pos, line))
1845 self.assertEquals(rlines, wlines)
1846
Antoine Pitrou19690592009-06-12 20:14:08 +00001847 def test_telling(self):
1848 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001851 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001852 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001853 p2 = f.tell()
1854 f.seek(0)
1855 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001856 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001857 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001858 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 self.assertEquals(f.tell(), p2)
1860 f.seek(0)
1861 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001862 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001863 self.assertRaises(IOError, f.tell)
1864 self.assertEquals(f.tell(), p2)
1865 f.close()
1866
Antoine Pitrou19690592009-06-12 20:14:08 +00001867 def test_seeking(self):
1868 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001869 prefix_size = chunk_size - 2
1870 u_prefix = "a" * prefix_size
1871 prefix = bytes(u_prefix.encode("utf-8"))
1872 self.assertEquals(len(u_prefix), len(prefix))
1873 u_suffix = "\u8888\n"
1874 suffix = bytes(u_suffix.encode("utf-8"))
1875 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001876 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001877 f.write(line*2)
1878 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001879 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001880 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001881 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001882 self.assertEquals(f.tell(), prefix_size)
1883 self.assertEquals(f.readline(), u_suffix)
1884
Antoine Pitrou19690592009-06-12 20:14:08 +00001885 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001886 # Regression test for a specific bug
1887 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 f.write(data)
1890 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001891 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001892 f._CHUNK_SIZE # Just test that it exists
1893 f._CHUNK_SIZE = 2
1894 f.readline()
1895 f.tell()
1896
Antoine Pitrou19690592009-06-12 20:14:08 +00001897 def test_seek_and_tell(self):
1898 #Test seek/tell using the StatefulIncrementalDecoder.
1899 # Make test faster by doing smaller seeks
1900 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001901
Antoine Pitrou19690592009-06-12 20:14:08 +00001902 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001903 """Tell/seek to various points within a data stream and ensure
1904 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001905 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001906 f.write(data)
1907 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001908 f = self.open(support.TESTFN, encoding='test_decoder')
1909 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001910 decoded = f.read()
1911 f.close()
1912
1913 for i in range(min_pos, len(decoded) + 1): # seek positions
1914 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001915 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001916 self.assertEquals(f.read(i), decoded[:i])
1917 cookie = f.tell()
1918 self.assertEquals(f.read(j), decoded[i:i + j])
1919 f.seek(cookie)
1920 self.assertEquals(f.read(), decoded[i:])
1921 f.close()
1922
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001923 # Enable the test decoder.
1924 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001925
1926 # Run the tests.
1927 try:
1928 # Try each test case.
1929 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001930 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931
1932 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001933 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1934 offset = CHUNK_SIZE - len(input)//2
1935 prefix = b'.'*offset
1936 # Don't bother seeking into the prefix (takes too long).
1937 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001939
1940 # Ensure our test decoder won't interfere with subsequent tests.
1941 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001942 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001943
Antoine Pitrou19690592009-06-12 20:14:08 +00001944 def test_encoded_writes(self):
1945 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001946 tests = ("utf-16",
1947 "utf-16-le",
1948 "utf-16-be",
1949 "utf-32",
1950 "utf-32-le",
1951 "utf-32-be")
1952 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001953 buf = self.BytesIO()
1954 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001955 # Check if the BOM is written only once (see issue1753).
1956 f.write(data)
1957 f.write(data)
1958 f.seek(0)
1959 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001960 f.seek(0)
1961 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001962 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1963
Antoine Pitrou19690592009-06-12 20:14:08 +00001964 def test_unreadable(self):
1965 class UnReadable(self.BytesIO):
1966 def readable(self):
1967 return False
1968 txt = self.TextIOWrapper(UnReadable())
1969 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001970
Antoine Pitrou19690592009-06-12 20:14:08 +00001971 def test_read_one_by_one(self):
1972 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001973 reads = ""
1974 while True:
1975 c = txt.read(1)
1976 if not c:
1977 break
1978 reads += c
1979 self.assertEquals(reads, "AA\nBB")
1980
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001981 def test_readlines(self):
1982 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1983 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1984 txt.seek(0)
1985 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1986 txt.seek(0)
1987 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1988
Christian Heimes1a6387e2008-03-26 12:49:49 +00001989 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001991 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00001992 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001993 reads = ""
1994 while True:
1995 c = txt.read(128)
1996 if not c:
1997 break
1998 reads += c
1999 self.assertEquals(reads, "A"*127+"\nB")
2000
2001 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002003
2004 # read one char at a time
2005 reads = ""
2006 while True:
2007 c = txt.read(1)
2008 if not c:
2009 break
2010 reads += c
2011 self.assertEquals(reads, self.normalized)
2012
2013 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002014 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002015 txt._CHUNK_SIZE = 4
2016
2017 reads = ""
2018 while True:
2019 c = txt.read(4)
2020 if not c:
2021 break
2022 reads += c
2023 self.assertEquals(reads, self.normalized)
2024
2025 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002026 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002027 txt._CHUNK_SIZE = 4
2028
2029 reads = txt.read(4)
2030 reads += txt.read(4)
2031 reads += txt.readline()
2032 reads += txt.readline()
2033 reads += txt.readline()
2034 self.assertEquals(reads, self.normalized)
2035
2036 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002037 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002038 txt._CHUNK_SIZE = 4
2039
2040 reads = txt.read(4)
2041 reads += txt.read()
2042 self.assertEquals(reads, self.normalized)
2043
2044 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002045 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002046 txt._CHUNK_SIZE = 4
2047
2048 reads = txt.read(4)
2049 pos = txt.tell()
2050 txt.seek(0)
2051 txt.seek(pos)
2052 self.assertEquals(txt.read(4), "BBB\n")
2053
2054 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002055 buffer = self.BytesIO(self.testdata)
2056 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002057
2058 self.assertEqual(buffer.seekable(), txt.seekable())
2059
Antoine Pitrou19690592009-06-12 20:14:08 +00002060 @unittest.skip("Issue #6213 with incremental encoders")
2061 def test_append_bom(self):
2062 # The BOM is not written again when appending to a non-empty file
2063 filename = support.TESTFN
2064 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2065 with self.open(filename, 'w', encoding=charset) as f:
2066 f.write('aaa')
2067 pos = f.tell()
2068 with self.open(filename, 'rb') as f:
2069 self.assertEquals(f.read(), 'aaa'.encode(charset))
2070
2071 with self.open(filename, 'a', encoding=charset) as f:
2072 f.write('xxx')
2073 with self.open(filename, 'rb') as f:
2074 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2075
2076 @unittest.skip("Issue #6213 with incremental encoders")
2077 def test_seek_bom(self):
2078 # Same test, but when seeking manually
2079 filename = support.TESTFN
2080 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2081 with self.open(filename, 'w', encoding=charset) as f:
2082 f.write('aaa')
2083 pos = f.tell()
2084 with self.open(filename, 'r+', encoding=charset) as f:
2085 f.seek(pos)
2086 f.write('zzz')
2087 f.seek(0)
2088 f.write('bbb')
2089 with self.open(filename, 'rb') as f:
2090 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2091
2092 def test_errors_property(self):
2093 with self.open(support.TESTFN, "w") as f:
2094 self.assertEqual(f.errors, "strict")
2095 with self.open(support.TESTFN, "w", errors="replace") as f:
2096 self.assertEqual(f.errors, "replace")
2097
Victor Stinner6a102812010-04-27 23:55:59 +00002098 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002099 def test_threads_write(self):
2100 # Issue6750: concurrent writes could duplicate data
2101 event = threading.Event()
2102 with self.open(support.TESTFN, "w", buffering=1) as f:
2103 def run(n):
2104 text = "Thread%03d\n" % n
2105 event.wait()
2106 f.write(text)
2107 threads = [threading.Thread(target=lambda n=x: run(n))
2108 for x in range(20)]
2109 for t in threads:
2110 t.start()
2111 time.sleep(0.02)
2112 event.set()
2113 for t in threads:
2114 t.join()
2115 with self.open(support.TESTFN) as f:
2116 content = f.read()
2117 for n in range(20):
2118 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2119
Antoine Pitrou19690592009-06-12 20:14:08 +00002120class CTextIOWrapperTest(TextIOWrapperTest):
2121
2122 def test_initialization(self):
2123 r = self.BytesIO(b"\xc3\xa9\n\n")
2124 b = self.BufferedReader(r, 1000)
2125 t = self.TextIOWrapper(b)
2126 self.assertRaises(TypeError, t.__init__, b, newline=42)
2127 self.assertRaises(ValueError, t.read)
2128 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2129 self.assertRaises(ValueError, t.read)
2130
2131 def test_garbage_collection(self):
2132 # C TextIOWrapper objects are collected, and collecting them flushes
2133 # all data to disk.
2134 # The Python version has __del__, so it ends in gc.garbage instead.
2135 rawio = io.FileIO(support.TESTFN, "wb")
2136 b = self.BufferedWriter(rawio)
2137 t = self.TextIOWrapper(b, encoding="ascii")
2138 t.write("456def")
2139 t.x = t
2140 wr = weakref.ref(t)
2141 del t
2142 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002143 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002144 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002145 self.assertEqual(f.read(), b"456def")
2146
2147class PyTextIOWrapperTest(TextIOWrapperTest):
2148 pass
2149
2150
2151class IncrementalNewlineDecoderTest(unittest.TestCase):
2152
2153 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002154 # UTF-8 specific tests for a newline decoder
2155 def _check_decode(b, s, **kwargs):
2156 # We exercise getstate() / setstate() as well as decode()
2157 state = decoder.getstate()
2158 self.assertEquals(decoder.decode(b, **kwargs), s)
2159 decoder.setstate(state)
2160 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002161
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002162 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002163
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002164 _check_decode(b'\xe8', "")
2165 _check_decode(b'\xa2', "")
2166 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002167
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002168 _check_decode(b'\xe8', "")
2169 _check_decode(b'\xa2', "")
2170 _check_decode(b'\x88', "\u8888")
2171
2172 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002173 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2174
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002175 decoder.reset()
2176 _check_decode(b'\n', "\n")
2177 _check_decode(b'\r', "")
2178 _check_decode(b'', "\n", final=True)
2179 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002180
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002181 _check_decode(b'\r', "")
2182 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002183
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002184 _check_decode(b'\r\r\n', "\n\n")
2185 _check_decode(b'\r', "")
2186 _check_decode(b'\r', "\n")
2187 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002188
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002189 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2190 _check_decode(b'\xe8\xa2\x88', "\u8888")
2191 _check_decode(b'\n', "\n")
2192 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2193 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002194
Antoine Pitrou19690592009-06-12 20:14:08 +00002195 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002196 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002197 if encoding is not None:
2198 encoder = codecs.getincrementalencoder(encoding)()
2199 def _decode_bytewise(s):
2200 # Decode one byte at a time
2201 for b in encoder.encode(s):
2202 result.append(decoder.decode(b))
2203 else:
2204 encoder = None
2205 def _decode_bytewise(s):
2206 # Decode one char at a time
2207 for c in s:
2208 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002209 self.assertEquals(decoder.newlines, None)
2210 _decode_bytewise("abc\n\r")
2211 self.assertEquals(decoder.newlines, '\n')
2212 _decode_bytewise("\nabc")
2213 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2214 _decode_bytewise("abc\r")
2215 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2216 _decode_bytewise("abc")
2217 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2218 _decode_bytewise("abc\r")
2219 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2220 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002221 input = "abc"
2222 if encoder is not None:
2223 encoder.reset()
2224 input = encoder.encode(input)
2225 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002226 self.assertEquals(decoder.newlines, None)
2227
2228 def test_newline_decoder(self):
2229 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002230 # None meaning the IncrementalNewlineDecoder takes unicode input
2231 # rather than bytes input
2232 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002233 'utf-16', 'utf-16-le', 'utf-16-be',
2234 'utf-32', 'utf-32-le', 'utf-32-be',
2235 )
2236 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002237 decoder = enc and codecs.getincrementaldecoder(enc)()
2238 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2239 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002240 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002241 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2242 self.check_newline_decoding_utf8(decoder)
2243
2244 def test_newline_bytes(self):
2245 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2246 def _check(dec):
2247 self.assertEquals(dec.newlines, None)
2248 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2249 self.assertEquals(dec.newlines, None)
2250 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2251 self.assertEquals(dec.newlines, None)
2252 dec = self.IncrementalNewlineDecoder(None, translate=False)
2253 _check(dec)
2254 dec = self.IncrementalNewlineDecoder(None, translate=True)
2255 _check(dec)
2256
2257class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2258 pass
2259
2260class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2261 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002262
Christian Heimes1a6387e2008-03-26 12:49:49 +00002263
2264# XXX Tests for open()
2265
2266class MiscIOTest(unittest.TestCase):
2267
Benjamin Petersonad100c32008-11-20 22:06:22 +00002268 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002269 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002270
Antoine Pitrou19690592009-06-12 20:14:08 +00002271 def test___all__(self):
2272 for name in self.io.__all__:
2273 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002274 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002275 if name == "open":
2276 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002277 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002278 self.assertTrue(issubclass(obj, Exception), name)
2279 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002280 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002281
Benjamin Petersonad100c32008-11-20 22:06:22 +00002282 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002283 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002284 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002285 f.close()
2286
Antoine Pitrou19690592009-06-12 20:14:08 +00002287 f = self.open(support.TESTFN, "U")
2288 self.assertEquals(f.name, support.TESTFN)
2289 self.assertEquals(f.buffer.name, support.TESTFN)
2290 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002291 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002292 self.assertEquals(f.buffer.mode, "rb")
2293 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002294 f.close()
2295
Antoine Pitrou19690592009-06-12 20:14:08 +00002296 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002297 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002298 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2299 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002300
Antoine Pitrou19690592009-06-12 20:14:08 +00002301 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002302 self.assertEquals(g.mode, "wb")
2303 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002304 self.assertEquals(g.name, f.fileno())
2305 self.assertEquals(g.raw.name, f.fileno())
2306 f.close()
2307 g.close()
2308
Antoine Pitrou19690592009-06-12 20:14:08 +00002309 def test_io_after_close(self):
2310 for kwargs in [
2311 {"mode": "w"},
2312 {"mode": "wb"},
2313 {"mode": "w", "buffering": 1},
2314 {"mode": "w", "buffering": 2},
2315 {"mode": "wb", "buffering": 0},
2316 {"mode": "r"},
2317 {"mode": "rb"},
2318 {"mode": "r", "buffering": 1},
2319 {"mode": "r", "buffering": 2},
2320 {"mode": "rb", "buffering": 0},
2321 {"mode": "w+"},
2322 {"mode": "w+b"},
2323 {"mode": "w+", "buffering": 1},
2324 {"mode": "w+", "buffering": 2},
2325 {"mode": "w+b", "buffering": 0},
2326 ]:
2327 f = self.open(support.TESTFN, **kwargs)
2328 f.close()
2329 self.assertRaises(ValueError, f.flush)
2330 self.assertRaises(ValueError, f.fileno)
2331 self.assertRaises(ValueError, f.isatty)
2332 self.assertRaises(ValueError, f.__iter__)
2333 if hasattr(f, "peek"):
2334 self.assertRaises(ValueError, f.peek, 1)
2335 self.assertRaises(ValueError, f.read)
2336 if hasattr(f, "read1"):
2337 self.assertRaises(ValueError, f.read1, 1024)
2338 if hasattr(f, "readinto"):
2339 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2340 self.assertRaises(ValueError, f.readline)
2341 self.assertRaises(ValueError, f.readlines)
2342 self.assertRaises(ValueError, f.seek, 0)
2343 self.assertRaises(ValueError, f.tell)
2344 self.assertRaises(ValueError, f.truncate)
2345 self.assertRaises(ValueError, f.write,
2346 b"" if "b" in kwargs['mode'] else "")
2347 self.assertRaises(ValueError, f.writelines, [])
2348 self.assertRaises(ValueError, next, f)
2349
2350 def test_blockingioerror(self):
2351 # Various BlockingIOError issues
2352 self.assertRaises(TypeError, self.BlockingIOError)
2353 self.assertRaises(TypeError, self.BlockingIOError, 1)
2354 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2355 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2356 b = self.BlockingIOError(1, "")
2357 self.assertEqual(b.characters_written, 0)
2358 class C(unicode):
2359 pass
2360 c = C("")
2361 b = self.BlockingIOError(1, c)
2362 c.b = b
2363 b.c = c
2364 wr = weakref.ref(c)
2365 del c, b
2366 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002367 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002368
2369 def test_abcs(self):
2370 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002371 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2372 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2373 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2374 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002375
2376 def _check_abc_inheritance(self, abcmodule):
2377 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002378 self.assertIsInstance(f, abcmodule.IOBase)
2379 self.assertIsInstance(f, abcmodule.RawIOBase)
2380 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2381 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002382 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002383 self.assertIsInstance(f, abcmodule.IOBase)
2384 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2385 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2386 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002387 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002388 self.assertIsInstance(f, abcmodule.IOBase)
2389 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2390 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2391 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002392
2393 def test_abc_inheritance(self):
2394 # Test implementations inherit from their respective ABCs
2395 self._check_abc_inheritance(self)
2396
2397 def test_abc_inheritance_official(self):
2398 # Test implementations inherit from the official ABCs of the
2399 # baseline "io" module.
2400 self._check_abc_inheritance(io)
2401
2402class CMiscIOTest(MiscIOTest):
2403 io = io
2404
2405class PyMiscIOTest(MiscIOTest):
2406 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002407
Christian Heimes1a6387e2008-03-26 12:49:49 +00002408def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002409 tests = (CIOTest, PyIOTest,
2410 CBufferedReaderTest, PyBufferedReaderTest,
2411 CBufferedWriterTest, PyBufferedWriterTest,
2412 CBufferedRWPairTest, PyBufferedRWPairTest,
2413 CBufferedRandomTest, PyBufferedRandomTest,
2414 StatefulIncrementalDecoderTest,
2415 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2416 CTextIOWrapperTest, PyTextIOWrapperTest,
2417 CMiscIOTest, PyMiscIOTest,
2418 )
2419
2420 # Put the namespaces of the IO module we are testing and some useful mock
2421 # classes in the __dict__ of each test.
2422 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2423 MockNonBlockWriterIO)
2424 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2425 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2426 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2427 globs = globals()
2428 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2429 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2430 # Avoid turning open into a bound method.
2431 py_io_ns["open"] = pyio.OpenWrapper
2432 for test in tests:
2433 if test.__name__.startswith("C"):
2434 for name, obj in c_io_ns.items():
2435 setattr(test, name, obj)
2436 elif test.__name__.startswith("Py"):
2437 for name, obj in py_io_ns.items():
2438 setattr(test, name, obj)
2439
2440 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002441
2442if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002443 test_main()