blob: 2874d79d95d31178e3e666f371ba2c1aa693347a [file] [log] [blame]
Antoine Pitrou19690592009-06-12 20:14:08 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
Christian Heimes1a6387e2008-03-26 12:49:49 +000022from __future__ import print_function
Christian Heimes3784c6b2008-03-26 23:13:59 +000023from __future__ import unicode_literals
Christian Heimes1a6387e2008-03-26 12:49:49 +000024
25import os
26import sys
27import time
28import array
Antoine Pitrou11ec65d2008-08-14 21:04:30 +000029import random
Christian Heimes1a6387e2008-03-26 12:49:49 +000030import unittest
Antoine Pitrou19690592009-06-12 20:14:08 +000031import weakref
Antoine Pitrou19690592009-06-12 20:14:08 +000032import abc
Georg Brandla4f46e12010-02-07 17:03:15 +000033from itertools import cycle, count
Antoine Pitrou19690592009-06-12 20:14:08 +000034from collections import deque
35from test import test_support as support
Christian Heimes1a6387e2008-03-26 12:49:49 +000036
37import codecs
Antoine Pitrou19690592009-06-12 20:14:08 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Victor Stinner6a102812010-04-27 23:55:59 +000040try:
41 import threading
42except ImportError:
43 threading = None
Antoine Pitrou19690592009-06-12 20:14:08 +000044
45__metaclass__ = type
46bytes = support.py3k_bytes
47
48def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
50 with io.open(__file__, "r", encoding="latin1") as f:
51 return f._CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +000052
53
Antoine Pitrou19690592009-06-12 20:14:08 +000054class MockRawIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +000055
56 def __init__(self, read_stack=()):
57 self._read_stack = list(read_stack)
58 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +000059 self._reads = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +000060
61 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +000062 self._reads += 1
Christian Heimes1a6387e2008-03-26 12:49:49 +000063 try:
64 return self._read_stack.pop(0)
65 except:
66 return b""
67
68 def write(self, b):
Antoine Pitrou19690592009-06-12 20:14:08 +000069 self._write_stack.append(bytes(b))
Christian Heimes1a6387e2008-03-26 12:49:49 +000070 return len(b)
71
72 def writable(self):
73 return True
74
75 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
81 def seekable(self):
82 return True
83
84 def seek(self, pos, whence):
Antoine Pitrou19690592009-06-12 20:14:08 +000085 return 0 # wrong but we gotta return something
Christian Heimes1a6387e2008-03-26 12:49:49 +000086
87 def tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
96 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
113class CMockRawIO(MockRawIO, io.RawIOBase):
114 pass
115
116class PyMockRawIO(MockRawIO, pyio.RawIOBase):
117 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000118
119
Antoine Pitrou19690592009-06-12 20:14:08 +0000120class MisbehavedRawIO(MockRawIO):
121 def write(self, b):
122 return MockRawIO.write(self, b) * 2
123
124 def read(self, n=None):
125 return MockRawIO.read(self, n) * 2
126
127 def seek(self, pos, whence):
128 return -123
129
130 def tell(self):
131 return -456
132
133 def readinto(self, buf):
134 MockRawIO.readinto(self, buf)
135 return len(buf) * 5
136
137class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
138 pass
139
140class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
141 pass
142
143
144class CloseFailureIO(MockRawIO):
145 closed = 0
146
147 def close(self):
148 if not self.closed:
149 self.closed = 1
150 raise IOError
151
152class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
153 pass
154
155class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
156 pass
157
158
159class MockFileIO:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000160
161 def __init__(self, data):
162 self.read_history = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000163 super(MockFileIO, self).__init__(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000164
165 def read(self, n=None):
Antoine Pitrou19690592009-06-12 20:14:08 +0000166 res = super(MockFileIO, self).read(n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000167 self.read_history.append(None if res is None else len(res))
168 return res
169
Antoine Pitrou19690592009-06-12 20:14:08 +0000170 def readinto(self, b):
171 res = super(MockFileIO, self).readinto(b)
172 self.read_history.append(res)
173 return res
Christian Heimes1a6387e2008-03-26 12:49:49 +0000174
Antoine Pitrou19690592009-06-12 20:14:08 +0000175class CMockFileIO(MockFileIO, io.BytesIO):
176 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000177
Antoine Pitrou19690592009-06-12 20:14:08 +0000178class PyMockFileIO(MockFileIO, pyio.BytesIO):
179 pass
180
181
182class MockNonBlockWriterIO:
183
184 def __init__(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000185 self._write_stack = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000186 self._blocker_char = None
Christian Heimes1a6387e2008-03-26 12:49:49 +0000187
Antoine Pitrou19690592009-06-12 20:14:08 +0000188 def pop_written(self):
189 s = b"".join(self._write_stack)
190 self._write_stack[:] = []
191 return s
192
193 def block_on(self, char):
194 """Block when a given char is encountered."""
195 self._blocker_char = char
196
197 def readable(self):
198 return True
199
200 def seekable(self):
201 return True
Christian Heimes1a6387e2008-03-26 12:49:49 +0000202
203 def writable(self):
204 return True
205
Antoine Pitrou19690592009-06-12 20:14:08 +0000206 def write(self, b):
207 b = bytes(b)
208 n = -1
209 if self._blocker_char:
210 try:
211 n = b.index(self._blocker_char)
212 except ValueError:
213 pass
214 else:
215 self._blocker_char = None
216 self._write_stack.append(b[:n])
217 raise self.BlockingIOError(0, "test blocking", n)
218 self._write_stack.append(b)
219 return len(b)
220
221class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
222 BlockingIOError = io.BlockingIOError
223
224class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
225 BlockingIOError = pyio.BlockingIOError
226
Christian Heimes1a6387e2008-03-26 12:49:49 +0000227
228class IOTest(unittest.TestCase):
229
Antoine Pitrou19690592009-06-12 20:14:08 +0000230 def setUp(self):
231 support.unlink(support.TESTFN)
232
Christian Heimes1a6387e2008-03-26 12:49:49 +0000233 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000234 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000235
236 def write_ops(self, f):
237 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000238 f.truncate(0)
239 self.assertEqual(f.tell(), 5)
240 f.seek(0)
241
242 self.assertEqual(f.write(b"blah."), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"Hello."), 6)
245 self.assertEqual(f.tell(), 6)
246 self.assertEqual(f.seek(-1, 1), 5)
247 self.assertEqual(f.tell(), 5)
248 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
249 self.assertEqual(f.seek(0), 0)
250 self.assertEqual(f.write(b"h"), 1)
251 self.assertEqual(f.seek(-1, 2), 13)
252 self.assertEqual(f.tell(), 13)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000253
Christian Heimes1a6387e2008-03-26 12:49:49 +0000254 self.assertEqual(f.truncate(12), 12)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000255 self.assertEqual(f.tell(), 13)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000256 self.assertRaises(TypeError, f.seek, 0.0)
257
258 def read_ops(self, f, buffered=False):
259 data = f.read(5)
260 self.assertEqual(data, b"hello")
261 data = bytearray(data)
262 self.assertEqual(f.readinto(data), 5)
263 self.assertEqual(data, b" worl")
264 self.assertEqual(f.readinto(data), 2)
265 self.assertEqual(len(data), 5)
266 self.assertEqual(data[:2], b"d\n")
267 self.assertEqual(f.seek(0), 0)
268 self.assertEqual(f.read(20), b"hello world\n")
269 self.assertEqual(f.read(1), b"")
270 self.assertEqual(f.readinto(bytearray(b"x")), 0)
271 self.assertEqual(f.seek(-6, 2), 6)
272 self.assertEqual(f.read(5), b"world")
273 self.assertEqual(f.read(0), b"")
274 self.assertEqual(f.readinto(bytearray()), 0)
275 self.assertEqual(f.seek(-6, 1), 5)
276 self.assertEqual(f.read(5), b" worl")
277 self.assertEqual(f.tell(), 10)
278 self.assertRaises(TypeError, f.seek, 0.0)
279 if buffered:
280 f.seek(0)
281 self.assertEqual(f.read(), b"hello world\n")
282 f.seek(6)
283 self.assertEqual(f.read(), b"world\n")
284 self.assertEqual(f.read(), b"")
285
286 LARGE = 2**31
287
288 def large_file_ops(self, f):
289 assert f.readable()
290 assert f.writable()
291 self.assertEqual(f.seek(self.LARGE), self.LARGE)
292 self.assertEqual(f.tell(), self.LARGE)
293 self.assertEqual(f.write(b"xxx"), 3)
294 self.assertEqual(f.tell(), self.LARGE + 3)
295 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
296 self.assertEqual(f.truncate(), self.LARGE + 2)
297 self.assertEqual(f.tell(), self.LARGE + 2)
298 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
299 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +0000300 self.assertEqual(f.tell(), self.LARGE + 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000301 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
302 self.assertEqual(f.seek(-1, 2), self.LARGE)
303 self.assertEqual(f.read(2), b"x")
304
Antoine Pitrou19690592009-06-12 20:14:08 +0000305 def test_invalid_operations(self):
306 # Try writing on a file opened in read mode and vice-versa.
307 for mode in ("w", "wb"):
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000308 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000309 self.assertRaises(IOError, fp.read)
310 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000311 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000312 self.assertRaises(IOError, fp.write, b"blah")
313 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000314 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou19690592009-06-12 20:14:08 +0000315 self.assertRaises(IOError, fp.write, "blah")
316 self.assertRaises(IOError, fp.writelines, ["blah\n"])
317
Christian Heimes1a6387e2008-03-26 12:49:49 +0000318 def test_raw_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000319 with self.open(support.TESTFN, "wb", buffering=0) as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb", buffering=0) as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000329
330 def test_buffered_file_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000331 with self.open(support.TESTFN, "wb") as f:
332 self.assertEqual(f.readable(), False)
333 self.assertEqual(f.writable(), True)
334 self.assertEqual(f.seekable(), True)
335 self.write_ops(f)
336 with self.open(support.TESTFN, "rb") as f:
337 self.assertEqual(f.readable(), True)
338 self.assertEqual(f.writable(), False)
339 self.assertEqual(f.seekable(), True)
340 self.read_ops(f, True)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000341
342 def test_readline(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000343 with self.open(support.TESTFN, "wb") as f:
344 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
345 with self.open(support.TESTFN, "rb") as f:
346 self.assertEqual(f.readline(), b"abc\n")
347 self.assertEqual(f.readline(10), b"def\n")
348 self.assertEqual(f.readline(2), b"xy")
349 self.assertEqual(f.readline(4), b"zzy\n")
350 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000351 self.assertEqual(f.readline(None), b"another line")
Antoine Pitrou19690592009-06-12 20:14:08 +0000352 self.assertRaises(TypeError, f.readline, 5.3)
353 with self.open(support.TESTFN, "r") as f:
354 self.assertRaises(TypeError, f.readline, 5.3)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000355
356 def test_raw_bytes_io(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000357 f = self.BytesIO()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000358 self.write_ops(f)
359 data = f.getvalue()
360 self.assertEqual(data, b"hello world\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000361 f = self.BytesIO(data)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000362 self.read_ops(f, True)
363
364 def test_large_file_ops(self):
365 # On Windows and Mac OSX this test comsumes large resources; It takes
366 # a long time to build the >2GB file and takes >2GB of disk space
367 # therefore the resource must be enabled to run this test.
Antoine Pitrou19690592009-06-12 20:14:08 +0000368 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
369 if not support.is_resource_enabled("largefile"):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000370 print("\nTesting large file ops skipped on %s." % sys.platform,
371 file=sys.stderr)
372 print("It requires %d bytes and a long time." % self.LARGE,
373 file=sys.stderr)
374 print("Use 'regrtest.py -u largefile test_io' to run it.",
375 file=sys.stderr)
376 return
Antoine Pitrou19690592009-06-12 20:14:08 +0000377 with self.open(support.TESTFN, "w+b", 0) as f:
378 self.large_file_ops(f)
379 with self.open(support.TESTFN, "w+b") as f:
380 self.large_file_ops(f)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000381
382 def test_with_open(self):
383 for bufsize in (0, 1, 100):
384 f = None
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000385 with self.open(support.TESTFN, "wb", bufsize) as f:
Christian Heimes1a6387e2008-03-26 12:49:49 +0000386 f.write(b"xxx")
387 self.assertEqual(f.closed, True)
388 f = None
389 try:
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000390 with self.open(support.TESTFN, "wb", bufsize) as f:
Ezio Melottidde5b942010-02-03 05:37:26 +0000391 1 // 0
Christian Heimes1a6387e2008-03-26 12:49:49 +0000392 except ZeroDivisionError:
393 self.assertEqual(f.closed, True)
394 else:
Ezio Melottidde5b942010-02-03 05:37:26 +0000395 self.fail("1 // 0 didn't raise an exception")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000396
Antoine Pitroue741cc62009-01-21 00:45:36 +0000397 # issue 5008
398 def test_append_mode_tell(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000399 with self.open(support.TESTFN, "wb") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000400 f.write(b"xxx")
Antoine Pitrou19690592009-06-12 20:14:08 +0000401 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000402 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000403 with self.open(support.TESTFN, "ab") as f:
Antoine Pitroue741cc62009-01-21 00:45:36 +0000404 self.assertEqual(f.tell(), 3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000405 with self.open(support.TESTFN, "a") as f:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000406 self.assertTrue(f.tell() > 0)
Antoine Pitroue741cc62009-01-21 00:45:36 +0000407
Christian Heimes1a6387e2008-03-26 12:49:49 +0000408 def test_destructor(self):
409 record = []
Antoine Pitrou19690592009-06-12 20:14:08 +0000410 class MyFileIO(self.FileIO):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000411 def __del__(self):
412 record.append(1)
Antoine Pitrou19690592009-06-12 20:14:08 +0000413 try:
414 f = super(MyFileIO, self).__del__
415 except AttributeError:
416 pass
417 else:
418 f()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000419 def close(self):
420 record.append(2)
Antoine Pitrou19690592009-06-12 20:14:08 +0000421 super(MyFileIO, self).close()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000422 def flush(self):
423 record.append(3)
Antoine Pitrou19690592009-06-12 20:14:08 +0000424 super(MyFileIO, self).flush()
425 f = MyFileIO(support.TESTFN, "wb")
426 f.write(b"xxx")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000427 del f
Antoine Pitrou19690592009-06-12 20:14:08 +0000428 support.gc_collect()
429 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000430 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000431 self.assertEqual(f.read(), b"xxx")
432
433 def _check_base_destructor(self, base):
434 record = []
435 class MyIO(base):
436 def __init__(self):
437 # This exercises the availability of attributes on object
438 # destruction.
439 # (in the C version, close() is called by the tp_dealloc
440 # function, not by __del__)
441 self.on_del = 1
442 self.on_close = 2
443 self.on_flush = 3
444 def __del__(self):
445 record.append(self.on_del)
446 try:
447 f = super(MyIO, self).__del__
448 except AttributeError:
449 pass
450 else:
451 f()
452 def close(self):
453 record.append(self.on_close)
454 super(MyIO, self).close()
455 def flush(self):
456 record.append(self.on_flush)
457 super(MyIO, self).flush()
458 f = MyIO()
459 del f
460 support.gc_collect()
Christian Heimes1a6387e2008-03-26 12:49:49 +0000461 self.assertEqual(record, [1, 2, 3])
462
Antoine Pitrou19690592009-06-12 20:14:08 +0000463 def test_IOBase_destructor(self):
464 self._check_base_destructor(self.IOBase)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000465
Antoine Pitrou19690592009-06-12 20:14:08 +0000466 def test_RawIOBase_destructor(self):
467 self._check_base_destructor(self.RawIOBase)
468
469 def test_BufferedIOBase_destructor(self):
470 self._check_base_destructor(self.BufferedIOBase)
471
472 def test_TextIOBase_destructor(self):
473 self._check_base_destructor(self.TextIOBase)
474
475 def test_close_flushes(self):
476 with self.open(support.TESTFN, "wb") as f:
477 f.write(b"xxx")
478 with self.open(support.TESTFN, "rb") as f:
479 self.assertEqual(f.read(), b"xxx")
480
481 def test_array_writes(self):
482 a = array.array(b'i', range(10))
483 n = len(a.tostring())
484 with self.open(support.TESTFN, "wb", 0) as f:
485 self.assertEqual(f.write(a), n)
486 with self.open(support.TESTFN, "wb") as f:
487 self.assertEqual(f.write(a), n)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000488
489 def test_closefd(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000490 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Christian Heimes1a6387e2008-03-26 12:49:49 +0000491 closefd=False)
492
Antoine Pitrou19690592009-06-12 20:14:08 +0000493 def test_read_closed(self):
494 with self.open(support.TESTFN, "w") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000495 f.write("egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000496 with self.open(support.TESTFN, "r") as f:
497 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000498 self.assertEqual(file.read(), "egg\n")
499 file.seek(0)
500 file.close()
501 self.assertRaises(ValueError, file.read)
502
503 def test_no_closefd_with_filename(self):
504 # can't use closefd in combination with a file name
Antoine Pitrou19690592009-06-12 20:14:08 +0000505 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000506
507 def test_closefd_attr(self):
Antoine Pitrou19690592009-06-12 20:14:08 +0000508 with self.open(support.TESTFN, "wb") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000509 f.write(b"egg\n")
Antoine Pitrou19690592009-06-12 20:14:08 +0000510 with self.open(support.TESTFN, "r") as f:
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000511 self.assertEqual(f.buffer.raw.closefd, True)
Antoine Pitrou19690592009-06-12 20:14:08 +0000512 file = self.open(f.fileno(), "r", closefd=False)
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000513 self.assertEqual(file.buffer.raw.closefd, False)
514
Antoine Pitrou19690592009-06-12 20:14:08 +0000515 def test_garbage_collection(self):
516 # FileIO objects are collected, and collecting them flushes
517 # all data to disk.
518 f = self.FileIO(support.TESTFN, "wb")
519 f.write(b"abcxxx")
520 f.f = f
521 wr = weakref.ref(f)
522 del f
523 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000524 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000525 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000526 self.assertEqual(f.read(), b"abcxxx")
Amaury Forgeot d'Arc32265652008-11-20 23:34:31 +0000527
Antoine Pitrou19690592009-06-12 20:14:08 +0000528 def test_unbounded_file(self):
529 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
530 zero = "/dev/zero"
531 if not os.path.exists(zero):
532 self.skipTest("{0} does not exist".format(zero))
533 if sys.maxsize > 0x7FFFFFFF:
534 self.skipTest("test can only run in a 32-bit address space")
535 if support.real_max_memuse < support._2G:
536 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000537 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000538 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000539 with self.open(zero, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000540 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000541 with self.open(zero, "r") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +0000542 self.assertRaises(OverflowError, f.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000543
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000544 def test_flush_error_on_close(self):
545 f = self.open(support.TESTFN, "wb", buffering=0)
546 def bad_flush():
547 raise IOError()
548 f.flush = bad_flush
549 self.assertRaises(IOError, f.close) # exception not swallowed
550
551 def test_multi_close(self):
552 f = self.open(support.TESTFN, "wb", buffering=0)
553 f.close()
554 f.close()
555 f.close()
556 self.assertRaises(ValueError, f.flush)
557
Antoine Pitrou19690592009-06-12 20:14:08 +0000558class CIOTest(IOTest):
559 pass
Christian Heimes1a6387e2008-03-26 12:49:49 +0000560
Antoine Pitrou19690592009-06-12 20:14:08 +0000561class PyIOTest(IOTest):
562 test_array_writes = unittest.skip(
563 "len(array.array) returns number of elements rather than bytelength"
564 )(IOTest.test_array_writes)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000565
566
Antoine Pitrou19690592009-06-12 20:14:08 +0000567class CommonBufferedTests:
568 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
569
570 def test_detach(self):
571 raw = self.MockRawIO()
572 buf = self.tp(raw)
573 self.assertIs(buf.detach(), raw)
574 self.assertRaises(ValueError, buf.detach)
575
576 def test_fileno(self):
577 rawio = self.MockRawIO()
578 bufio = self.tp(rawio)
579
580 self.assertEquals(42, bufio.fileno())
581
582 def test_no_fileno(self):
583 # XXX will we always have fileno() function? If so, kill
584 # this test. Else, write it.
585 pass
586
587 def test_invalid_args(self):
588 rawio = self.MockRawIO()
589 bufio = self.tp(rawio)
590 # Invalid whence
591 self.assertRaises(ValueError, bufio.seek, 0, -1)
592 self.assertRaises(ValueError, bufio.seek, 0, 3)
593
594 def test_override_destructor(self):
595 tp = self.tp
596 record = []
597 class MyBufferedIO(tp):
598 def __del__(self):
599 record.append(1)
600 try:
601 f = super(MyBufferedIO, self).__del__
602 except AttributeError:
603 pass
604 else:
605 f()
606 def close(self):
607 record.append(2)
608 super(MyBufferedIO, self).close()
609 def flush(self):
610 record.append(3)
611 super(MyBufferedIO, self).flush()
612 rawio = self.MockRawIO()
613 bufio = MyBufferedIO(rawio)
614 writable = bufio.writable()
615 del bufio
616 support.gc_collect()
617 if writable:
618 self.assertEqual(record, [1, 2, 3])
619 else:
620 self.assertEqual(record, [1, 2])
621
622 def test_context_manager(self):
623 # Test usability as a context manager
624 rawio = self.MockRawIO()
625 bufio = self.tp(rawio)
626 def _with():
627 with bufio:
628 pass
629 _with()
630 # bufio should now be closed, and using it a second time should raise
631 # a ValueError.
632 self.assertRaises(ValueError, _with)
633
634 def test_error_through_destructor(self):
635 # Test that the exception state is not modified by a destructor,
636 # even if close() fails.
637 rawio = self.CloseFailureIO()
638 def f():
639 self.tp(rawio).xyzzy
640 with support.captured_output("stderr") as s:
641 self.assertRaises(AttributeError, f)
642 s = s.getvalue().strip()
643 if s:
644 # The destructor *may* have printed an unraisable error, check it
645 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000646 self.assertTrue(s.startswith("Exception IOError: "), s)
647 self.assertTrue(s.endswith(" ignored"), s)
Antoine Pitrou19690592009-06-12 20:14:08 +0000648
649 def test_repr(self):
650 raw = self.MockRawIO()
651 b = self.tp(raw)
652 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
653 self.assertEqual(repr(b), "<%s>" % clsname)
654 raw.name = "dummy"
655 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
656 raw.name = b"dummy"
657 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000658
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +0000659 def test_flush_error_on_close(self):
660 raw = self.MockRawIO()
661 def bad_flush():
662 raise IOError()
663 raw.flush = bad_flush
664 b = self.tp(raw)
665 self.assertRaises(IOError, b.close) # exception not swallowed
666
667 def test_multi_close(self):
668 raw = self.MockRawIO()
669 b = self.tp(raw)
670 b.close()
671 b.close()
672 b.close()
673 self.assertRaises(ValueError, b.flush)
674
Christian Heimes1a6387e2008-03-26 12:49:49 +0000675
Antoine Pitrou19690592009-06-12 20:14:08 +0000676class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
677 read_mode = "rb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000678
Antoine Pitrou19690592009-06-12 20:14:08 +0000679 def test_constructor(self):
680 rawio = self.MockRawIO([b"abc"])
681 bufio = self.tp(rawio)
682 bufio.__init__(rawio)
683 bufio.__init__(rawio, buffer_size=1024)
684 bufio.__init__(rawio, buffer_size=16)
685 self.assertEquals(b"abc", bufio.read())
686 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
687 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
688 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
689 rawio = self.MockRawIO([b"abc"])
690 bufio.__init__(rawio)
691 self.assertEquals(b"abc", bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000692
Antoine Pitrou19690592009-06-12 20:14:08 +0000693 def test_read(self):
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000694 for arg in (None, 7):
695 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
696 bufio = self.tp(rawio)
697 self.assertEquals(b"abcdefg", bufio.read(arg))
Antoine Pitrou19690592009-06-12 20:14:08 +0000698 # Invalid args
699 self.assertRaises(ValueError, bufio.read, -2)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000700
Antoine Pitrou19690592009-06-12 20:14:08 +0000701 def test_read1(self):
702 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
703 bufio = self.tp(rawio)
704 self.assertEquals(b"a", bufio.read(1))
705 self.assertEquals(b"b", bufio.read1(1))
706 self.assertEquals(rawio._reads, 1)
707 self.assertEquals(b"c", bufio.read1(100))
708 self.assertEquals(rawio._reads, 1)
709 self.assertEquals(b"d", bufio.read1(100))
710 self.assertEquals(rawio._reads, 2)
711 self.assertEquals(b"efg", bufio.read1(100))
712 self.assertEquals(rawio._reads, 3)
713 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000714 self.assertEquals(rawio._reads, 4)
Antoine Pitrou19690592009-06-12 20:14:08 +0000715 # Invalid args
716 self.assertRaises(ValueError, bufio.read1, -1)
717
718 def test_readinto(self):
719 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
720 bufio = self.tp(rawio)
721 b = bytearray(2)
722 self.assertEquals(bufio.readinto(b), 2)
723 self.assertEquals(b, b"ab")
724 self.assertEquals(bufio.readinto(b), 2)
725 self.assertEquals(b, b"cd")
726 self.assertEquals(bufio.readinto(b), 2)
727 self.assertEquals(b, b"ef")
728 self.assertEquals(bufio.readinto(b), 1)
729 self.assertEquals(b, b"gf")
730 self.assertEquals(bufio.readinto(b), 0)
731 self.assertEquals(b, b"gf")
732
Benjamin Petersonddd392c2009-12-13 19:19:07 +0000733 def test_readlines(self):
734 def bufio():
735 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
736 return self.tp(rawio)
737 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
738 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
739 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
740
Antoine Pitrou19690592009-06-12 20:14:08 +0000741 def test_buffering(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000742 data = b"abcdefghi"
743 dlen = len(data)
744
745 tests = [
746 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
747 [ 100, [ 3, 3, 3], [ dlen ] ],
748 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
749 ]
750
751 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +0000752 rawio = self.MockFileIO(data)
753 bufio = self.tp(rawio, buffer_size=bufsize)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000754 pos = 0
755 for nbytes in buf_read_sizes:
756 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
757 pos += nbytes
Antoine Pitrou19690592009-06-12 20:14:08 +0000758 # this is mildly implementation-dependent
Christian Heimes1a6387e2008-03-26 12:49:49 +0000759 self.assertEquals(rawio.read_history, raw_read_sizes)
760
Antoine Pitrou19690592009-06-12 20:14:08 +0000761 def test_read_non_blocking(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000762 # Inject some None's in there to simulate EWOULDBLOCK
Antoine Pitrou19690592009-06-12 20:14:08 +0000763 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
764 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000765
766 self.assertEquals(b"abcd", bufio.read(6))
767 self.assertEquals(b"e", bufio.read(1))
768 self.assertEquals(b"fg", bufio.read())
Antoine Pitrou19690592009-06-12 20:14:08 +0000769 self.assertEquals(b"", bufio.peek(1))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000770 self.assertTrue(None is bufio.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000771 self.assertEquals(b"", bufio.read())
772
Antoine Pitrou19690592009-06-12 20:14:08 +0000773 def test_read_past_eof(self):
774 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
775 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000776
777 self.assertEquals(b"abcdefg", bufio.read(9000))
778
Antoine Pitrou19690592009-06-12 20:14:08 +0000779 def test_read_all(self):
780 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
781 bufio = self.tp(rawio)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000782
783 self.assertEquals(b"abcdefg", bufio.read())
784
Victor Stinner6a102812010-04-27 23:55:59 +0000785 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +0000786 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000787 try:
788 # Write out many bytes with exactly the same number of 0's,
789 # 1's... 255's. This will help us check that concurrent reading
790 # doesn't duplicate or forget contents.
791 N = 1000
Antoine Pitrou19690592009-06-12 20:14:08 +0000792 l = list(range(256)) * N
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000793 random.shuffle(l)
794 s = bytes(bytearray(l))
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000795 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000796 f.write(s)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +0000797 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +0000798 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000799 errors = []
800 results = []
801 def f():
802 try:
803 # Intra-buffer read then buffer-flushing read
804 for n in cycle([1, 19]):
805 s = bufio.read(n)
806 if not s:
807 break
808 # list.append() is atomic
809 results.append(s)
810 except Exception as e:
811 errors.append(e)
812 raise
813 threads = [threading.Thread(target=f) for x in range(20)]
814 for t in threads:
815 t.start()
816 time.sleep(0.02) # yield
817 for t in threads:
818 t.join()
819 self.assertFalse(errors,
820 "the following exceptions were caught: %r" % errors)
821 s = b''.join(results)
822 for i in range(256):
823 c = bytes(bytearray([i]))
824 self.assertEqual(s.count(c), N)
825 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +0000826 support.unlink(support.TESTFN)
827
828 def test_misbehaved_io(self):
829 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
830 bufio = self.tp(rawio)
831 self.assertRaises(IOError, bufio.seek, 0)
832 self.assertRaises(IOError, bufio.tell)
833
834class CBufferedReaderTest(BufferedReaderTest):
835 tp = io.BufferedReader
836
837 def test_constructor(self):
838 BufferedReaderTest.test_constructor(self)
839 # The allocation can succeed on 32-bit builds, e.g. with more
840 # than 2GB RAM and a 64-bit kernel.
841 if sys.maxsize > 0x7FFFFFFF:
842 rawio = self.MockRawIO()
843 bufio = self.tp(rawio)
844 self.assertRaises((OverflowError, MemoryError, ValueError),
845 bufio.__init__, rawio, sys.maxsize)
846
847 def test_initialization(self):
848 rawio = self.MockRawIO([b"abc"])
849 bufio = self.tp(rawio)
850 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
851 self.assertRaises(ValueError, bufio.read)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
853 self.assertRaises(ValueError, bufio.read)
854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
855 self.assertRaises(ValueError, bufio.read)
856
857 def test_misbehaved_io_read(self):
858 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
859 bufio = self.tp(rawio)
860 # _pyio.BufferedReader seems to implement reading different, so that
861 # checking this is not so easy.
862 self.assertRaises(IOError, bufio.read, 10)
863
864 def test_garbage_collection(self):
865 # C BufferedReader objects are collected.
866 # The Python version has __del__, so it ends into gc.garbage instead
867 rawio = self.FileIO(support.TESTFN, "w+b")
868 f = self.tp(rawio)
869 f.f = f
870 wr = weakref.ref(f)
871 del f
872 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000873 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +0000874
875class PyBufferedReaderTest(BufferedReaderTest):
876 tp = pyio.BufferedReader
Antoine Pitrou11ec65d2008-08-14 21:04:30 +0000877
878
Antoine Pitrou19690592009-06-12 20:14:08 +0000879class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
880 write_mode = "wb"
Christian Heimes1a6387e2008-03-26 12:49:49 +0000881
Antoine Pitrou19690592009-06-12 20:14:08 +0000882 def test_constructor(self):
883 rawio = self.MockRawIO()
884 bufio = self.tp(rawio)
885 bufio.__init__(rawio)
886 bufio.__init__(rawio, buffer_size=1024)
887 bufio.__init__(rawio, buffer_size=16)
888 self.assertEquals(3, bufio.write(b"abc"))
889 bufio.flush()
890 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
891 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
893 bufio.__init__(rawio)
894 self.assertEquals(3, bufio.write(b"ghi"))
895 bufio.flush()
896 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000897
Antoine Pitrou19690592009-06-12 20:14:08 +0000898 def test_detach_flush(self):
899 raw = self.MockRawIO()
900 buf = self.tp(raw)
901 buf.write(b"howdy!")
902 self.assertFalse(raw._write_stack)
903 buf.detach()
904 self.assertEqual(raw._write_stack, [b"howdy!"])
905
906 def test_write(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +0000907 # Write to the buffered IO but don't overflow the buffer.
Antoine Pitrou19690592009-06-12 20:14:08 +0000908 writer = self.MockRawIO()
909 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000910 bufio.write(b"abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000911 self.assertFalse(writer._write_stack)
912
Antoine Pitrou19690592009-06-12 20:14:08 +0000913 def test_write_overflow(self):
914 writer = self.MockRawIO()
915 bufio = self.tp(writer, 8)
916 contents = b"abcdefghijklmnop"
917 for n in range(0, len(contents), 3):
918 bufio.write(contents[n:n+3])
919 flushed = b"".join(writer._write_stack)
920 # At least (total - 8) bytes were implicitly flushed, perhaps more
921 # depending on the implementation.
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000922 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000923
Antoine Pitrou19690592009-06-12 20:14:08 +0000924 def check_writes(self, intermediate_func):
925 # Lots of writes, test the flushed output is as expected.
926 contents = bytes(range(256)) * 1000
927 n = 0
928 writer = self.MockRawIO()
929 bufio = self.tp(writer, 13)
930 # Generator of write sizes: repeat each N 15 times then proceed to N+1
931 def gen_sizes():
932 for size in count(1):
933 for i in range(15):
934 yield size
935 sizes = gen_sizes()
936 while n < len(contents):
937 size = min(next(sizes), len(contents) - n)
938 self.assertEquals(bufio.write(contents[n:n+size]), size)
939 intermediate_func(bufio)
940 n += size
941 bufio.flush()
942 self.assertEquals(contents,
943 b"".join(writer._write_stack))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000944
Antoine Pitrou19690592009-06-12 20:14:08 +0000945 def test_writes(self):
946 self.check_writes(lambda bufio: None)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000947
Antoine Pitrou19690592009-06-12 20:14:08 +0000948 def test_writes_and_flushes(self):
949 self.check_writes(lambda bufio: bufio.flush())
Christian Heimes1a6387e2008-03-26 12:49:49 +0000950
Antoine Pitrou19690592009-06-12 20:14:08 +0000951 def test_writes_and_seeks(self):
952 def _seekabs(bufio):
953 pos = bufio.tell()
954 bufio.seek(pos + 1, 0)
955 bufio.seek(pos - 1, 0)
956 bufio.seek(pos, 0)
957 self.check_writes(_seekabs)
958 def _seekrel(bufio):
959 pos = bufio.seek(0, 1)
960 bufio.seek(+1, 1)
961 bufio.seek(-1, 1)
962 bufio.seek(pos, 0)
963 self.check_writes(_seekrel)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000964
Antoine Pitrou19690592009-06-12 20:14:08 +0000965 def test_writes_and_truncates(self):
966 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Christian Heimes1a6387e2008-03-26 12:49:49 +0000967
Antoine Pitrou19690592009-06-12 20:14:08 +0000968 def test_write_non_blocking(self):
969 raw = self.MockNonBlockWriterIO()
970 bufio = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000971
Antoine Pitrou19690592009-06-12 20:14:08 +0000972 self.assertEquals(bufio.write(b"abcd"), 4)
973 self.assertEquals(bufio.write(b"efghi"), 5)
974 # 1 byte will be written, the rest will be buffered
975 raw.block_on(b"k")
976 self.assertEquals(bufio.write(b"jklmn"), 5)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000977
Antoine Pitrou19690592009-06-12 20:14:08 +0000978 # 8 bytes will be written, 8 will be buffered and the rest will be lost
979 raw.block_on(b"0")
980 try:
981 bufio.write(b"opqrwxyz0123456789")
982 except self.BlockingIOError as e:
983 written = e.characters_written
984 else:
985 self.fail("BlockingIOError should have been raised")
986 self.assertEquals(written, 16)
987 self.assertEquals(raw.pop_written(),
988 b"abcdefghijklmnopqrwxyz")
Christian Heimes1a6387e2008-03-26 12:49:49 +0000989
Antoine Pitrou19690592009-06-12 20:14:08 +0000990 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
991 s = raw.pop_written()
992 # Previously buffered bytes were flushed
993 self.assertTrue(s.startswith(b"01234567A"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +0000994
Antoine Pitrou19690592009-06-12 20:14:08 +0000995 def test_write_and_rewind(self):
996 raw = io.BytesIO()
997 bufio = self.tp(raw, 4)
998 self.assertEqual(bufio.write(b"abcdef"), 6)
999 self.assertEqual(bufio.tell(), 6)
1000 bufio.seek(0, 0)
1001 self.assertEqual(bufio.write(b"XY"), 2)
1002 bufio.seek(6, 0)
1003 self.assertEqual(raw.getvalue(), b"XYcdef")
1004 self.assertEqual(bufio.write(b"123456"), 6)
1005 bufio.flush()
1006 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001007
Antoine Pitrou19690592009-06-12 20:14:08 +00001008 def test_flush(self):
1009 writer = self.MockRawIO()
1010 bufio = self.tp(writer, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001011 bufio.write(b"abc")
1012 bufio.flush()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001013 self.assertEquals(b"abc", writer._write_stack[0])
1014
Antoine Pitrou19690592009-06-12 20:14:08 +00001015 def test_destructor(self):
1016 writer = self.MockRawIO()
1017 bufio = self.tp(writer, 8)
1018 bufio.write(b"abc")
1019 del bufio
1020 support.gc_collect()
1021 self.assertEquals(b"abc", writer._write_stack[0])
1022
1023 def test_truncate(self):
1024 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001025 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001026 bufio = self.tp(raw, 8)
1027 bufio.write(b"abcdef")
1028 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001029 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001030 with self.open(support.TESTFN, "rb", buffering=0) as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001031 self.assertEqual(f.read(), b"abc")
1032
Victor Stinner6a102812010-04-27 23:55:59 +00001033 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou19690592009-06-12 20:14:08 +00001034 def test_threads(self):
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001035 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001036 # Write out many bytes from many threads and test they were
1037 # all flushed.
1038 N = 1000
1039 contents = bytes(range(256)) * N
1040 sizes = cycle([1, 19])
1041 n = 0
1042 queue = deque()
1043 while n < len(contents):
1044 size = next(sizes)
1045 queue.append(contents[n:n+size])
1046 n += size
1047 del contents
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001048 # We use a real file object because it allows us to
1049 # exercise situations where the GIL is released before
1050 # writing the buffer to the raw streams. This is in addition
1051 # to concurrency issues due to switching threads in the middle
1052 # of Python code.
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001053 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Antoine Pitrou19690592009-06-12 20:14:08 +00001054 bufio = self.tp(raw, 8)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001055 errors = []
1056 def f():
1057 try:
Antoine Pitrou19690592009-06-12 20:14:08 +00001058 while True:
1059 try:
1060 s = queue.popleft()
1061 except IndexError:
1062 return
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001063 bufio.write(s)
1064 except Exception as e:
1065 errors.append(e)
1066 raise
1067 threads = [threading.Thread(target=f) for x in range(20)]
1068 for t in threads:
1069 t.start()
1070 time.sleep(0.02) # yield
1071 for t in threads:
1072 t.join()
1073 self.assertFalse(errors,
1074 "the following exceptions were caught: %r" % errors)
Antoine Pitrou19690592009-06-12 20:14:08 +00001075 bufio.close()
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001076 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001077 s = f.read()
1078 for i in range(256):
1079 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001080 finally:
Antoine Pitrou19690592009-06-12 20:14:08 +00001081 support.unlink(support.TESTFN)
Antoine Pitrou11ec65d2008-08-14 21:04:30 +00001082
Antoine Pitrou19690592009-06-12 20:14:08 +00001083 def test_misbehaved_io(self):
1084 rawio = self.MisbehavedRawIO()
1085 bufio = self.tp(rawio, 5)
1086 self.assertRaises(IOError, bufio.seek, 0)
1087 self.assertRaises(IOError, bufio.tell)
1088 self.assertRaises(IOError, bufio.write, b"abcdef")
1089
1090 def test_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001091 with support.check_warnings(("max_buffer_size is deprecated",
1092 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001093 self.tp(self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001094
1095
1096class CBufferedWriterTest(BufferedWriterTest):
1097 tp = io.BufferedWriter
1098
1099 def test_constructor(self):
1100 BufferedWriterTest.test_constructor(self)
1101 # The allocation can succeed on 32-bit builds, e.g. with more
1102 # than 2GB RAM and a 64-bit kernel.
1103 if sys.maxsize > 0x7FFFFFFF:
1104 rawio = self.MockRawIO()
1105 bufio = self.tp(rawio)
1106 self.assertRaises((OverflowError, MemoryError, ValueError),
1107 bufio.__init__, rawio, sys.maxsize)
1108
1109 def test_initialization(self):
1110 rawio = self.MockRawIO()
1111 bufio = self.tp(rawio)
1112 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1113 self.assertRaises(ValueError, bufio.write, b"def")
1114 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1115 self.assertRaises(ValueError, bufio.write, b"def")
1116 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1117 self.assertRaises(ValueError, bufio.write, b"def")
1118
1119 def test_garbage_collection(self):
1120 # C BufferedWriter objects are collected, and collecting them flushes
1121 # all data to disk.
1122 # The Python version has __del__, so it ends into gc.garbage instead
1123 rawio = self.FileIO(support.TESTFN, "w+b")
1124 f = self.tp(rawio)
1125 f.write(b"123xxx")
1126 f.x = f
1127 wr = weakref.ref(f)
1128 del f
1129 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001130 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00001131 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001132 self.assertEqual(f.read(), b"123xxx")
1133
1134
1135class PyBufferedWriterTest(BufferedWriterTest):
1136 tp = pyio.BufferedWriter
Christian Heimes1a6387e2008-03-26 12:49:49 +00001137
1138class BufferedRWPairTest(unittest.TestCase):
1139
Antoine Pitrou19690592009-06-12 20:14:08 +00001140 def test_constructor(self):
1141 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson54686e32008-12-24 15:10:27 +00001142 self.assertFalse(pair.closed)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001143
Antoine Pitrou19690592009-06-12 20:14:08 +00001144 def test_detach(self):
1145 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1146 self.assertRaises(self.UnsupportedOperation, pair.detach)
1147
1148 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna945a8ba2010-03-17 19:15:56 +00001149 with support.check_warnings(("max_buffer_size is deprecated",
1150 DeprecationWarning)):
Antoine Pitrou19690592009-06-12 20:14:08 +00001151 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Antoine Pitrou19690592009-06-12 20:14:08 +00001152
1153 def test_constructor_with_not_readable(self):
1154 class NotReadable(MockRawIO):
1155 def readable(self):
1156 return False
1157
1158 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1159
1160 def test_constructor_with_not_writeable(self):
1161 class NotWriteable(MockRawIO):
1162 def writable(self):
1163 return False
1164
1165 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1166
1167 def test_read(self):
1168 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1169
1170 self.assertEqual(pair.read(3), b"abc")
1171 self.assertEqual(pair.read(1), b"d")
1172 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001173 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1174 self.assertEqual(pair.read(None), b"abc")
1175
1176 def test_readlines(self):
1177 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1178 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1179 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1180 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitrou19690592009-06-12 20:14:08 +00001181
1182 def test_read1(self):
1183 # .read1() is delegated to the underlying reader object, so this test
1184 # can be shallow.
1185 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1186
1187 self.assertEqual(pair.read1(3), b"abc")
1188
1189 def test_readinto(self):
1190 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1191
1192 data = bytearray(5)
1193 self.assertEqual(pair.readinto(data), 5)
1194 self.assertEqual(data, b"abcde")
1195
1196 def test_write(self):
1197 w = self.MockRawIO()
1198 pair = self.tp(self.MockRawIO(), w)
1199
1200 pair.write(b"abc")
1201 pair.flush()
1202 pair.write(b"def")
1203 pair.flush()
1204 self.assertEqual(w._write_stack, [b"abc", b"def"])
1205
1206 def test_peek(self):
1207 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1208
1209 self.assertTrue(pair.peek(3).startswith(b"abc"))
1210 self.assertEqual(pair.read(3), b"abc")
1211
1212 def test_readable(self):
1213 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1214 self.assertTrue(pair.readable())
1215
1216 def test_writeable(self):
1217 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1218 self.assertTrue(pair.writable())
1219
1220 def test_seekable(self):
1221 # BufferedRWPairs are never seekable, even if their readers and writers
1222 # are.
1223 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1224 self.assertFalse(pair.seekable())
1225
1226 # .flush() is delegated to the underlying writer object and has been
1227 # tested in the test_write method.
1228
1229 def test_close_and_closed(self):
1230 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1231 self.assertFalse(pair.closed)
1232 pair.close()
1233 self.assertTrue(pair.closed)
1234
1235 def test_isatty(self):
1236 class SelectableIsAtty(MockRawIO):
1237 def __init__(self, isatty):
1238 MockRawIO.__init__(self)
1239 self._isatty = isatty
1240
1241 def isatty(self):
1242 return self._isatty
1243
1244 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1245 self.assertFalse(pair.isatty())
1246
1247 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1248 self.assertTrue(pair.isatty())
1249
1250 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1251 self.assertTrue(pair.isatty())
1252
1253 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1254 self.assertTrue(pair.isatty())
1255
1256class CBufferedRWPairTest(BufferedRWPairTest):
1257 tp = io.BufferedRWPair
1258
1259class PyBufferedRWPairTest(BufferedRWPairTest):
1260 tp = pyio.BufferedRWPair
Christian Heimes1a6387e2008-03-26 12:49:49 +00001261
1262
Antoine Pitrou19690592009-06-12 20:14:08 +00001263class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1264 read_mode = "rb+"
1265 write_mode = "wb+"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001266
Antoine Pitrou19690592009-06-12 20:14:08 +00001267 def test_constructor(self):
1268 BufferedReaderTest.test_constructor(self)
1269 BufferedWriterTest.test_constructor(self)
1270
1271 def test_read_and_write(self):
1272 raw = self.MockRawIO((b"asdf", b"ghjk"))
1273 rw = self.tp(raw, 8)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001274
1275 self.assertEqual(b"as", rw.read(2))
1276 rw.write(b"ddd")
1277 rw.write(b"eee")
1278 self.assertFalse(raw._write_stack) # Buffer writes
Antoine Pitrou19690592009-06-12 20:14:08 +00001279 self.assertEqual(b"ghjk", rw.read())
Christian Heimes1a6387e2008-03-26 12:49:49 +00001280 self.assertEquals(b"dddeee", raw._write_stack[0])
1281
Antoine Pitrou19690592009-06-12 20:14:08 +00001282 def test_seek_and_tell(self):
1283 raw = self.BytesIO(b"asdfghjkl")
1284 rw = self.tp(raw)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001285
1286 self.assertEquals(b"as", rw.read(2))
1287 self.assertEquals(2, rw.tell())
1288 rw.seek(0, 0)
1289 self.assertEquals(b"asdf", rw.read(4))
1290
1291 rw.write(b"asdf")
1292 rw.seek(0, 0)
1293 self.assertEquals(b"asdfasdfl", rw.read())
1294 self.assertEquals(9, rw.tell())
1295 rw.seek(-4, 2)
1296 self.assertEquals(5, rw.tell())
1297 rw.seek(2, 1)
1298 self.assertEquals(7, rw.tell())
1299 self.assertEquals(b"fl", rw.read(11))
1300 self.assertRaises(TypeError, rw.seek, 0.0)
1301
Antoine Pitrou19690592009-06-12 20:14:08 +00001302 def check_flush_and_read(self, read_func):
1303 raw = self.BytesIO(b"abcdefghi")
1304 bufio = self.tp(raw)
1305
1306 self.assertEquals(b"ab", read_func(bufio, 2))
1307 bufio.write(b"12")
1308 self.assertEquals(b"ef", read_func(bufio, 2))
1309 self.assertEquals(6, bufio.tell())
1310 bufio.flush()
1311 self.assertEquals(6, bufio.tell())
1312 self.assertEquals(b"ghi", read_func(bufio))
1313 raw.seek(0, 0)
1314 raw.write(b"XYZ")
1315 # flush() resets the read buffer
1316 bufio.flush()
1317 bufio.seek(0, 0)
1318 self.assertEquals(b"XYZ", read_func(bufio, 3))
1319
1320 def test_flush_and_read(self):
1321 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1322
1323 def test_flush_and_readinto(self):
1324 def _readinto(bufio, n=-1):
1325 b = bytearray(n if n >= 0 else 9999)
1326 n = bufio.readinto(b)
1327 return bytes(b[:n])
1328 self.check_flush_and_read(_readinto)
1329
1330 def test_flush_and_peek(self):
1331 def _peek(bufio, n=-1):
1332 # This relies on the fact that the buffer can contain the whole
1333 # raw stream, otherwise peek() can return less.
1334 b = bufio.peek(n)
1335 if n != -1:
1336 b = b[:n]
1337 bufio.seek(len(b), 1)
1338 return b
1339 self.check_flush_and_read(_peek)
1340
1341 def test_flush_and_write(self):
1342 raw = self.BytesIO(b"abcdefghi")
1343 bufio = self.tp(raw)
1344
1345 bufio.write(b"123")
1346 bufio.flush()
1347 bufio.write(b"45")
1348 bufio.flush()
1349 bufio.seek(0, 0)
1350 self.assertEquals(b"12345fghi", raw.getvalue())
1351 self.assertEquals(b"12345fghi", bufio.read())
1352
1353 def test_threads(self):
1354 BufferedReaderTest.test_threads(self)
1355 BufferedWriterTest.test_threads(self)
1356
1357 def test_writes_and_peek(self):
1358 def _peek(bufio):
1359 bufio.peek(1)
1360 self.check_writes(_peek)
1361 def _peek(bufio):
1362 pos = bufio.tell()
1363 bufio.seek(-1, 1)
1364 bufio.peek(1)
1365 bufio.seek(pos, 0)
1366 self.check_writes(_peek)
1367
1368 def test_writes_and_reads(self):
1369 def _read(bufio):
1370 bufio.seek(-1, 1)
1371 bufio.read(1)
1372 self.check_writes(_read)
1373
1374 def test_writes_and_read1s(self):
1375 def _read1(bufio):
1376 bufio.seek(-1, 1)
1377 bufio.read1(1)
1378 self.check_writes(_read1)
1379
1380 def test_writes_and_readintos(self):
1381 def _read(bufio):
1382 bufio.seek(-1, 1)
1383 bufio.readinto(bytearray(1))
1384 self.check_writes(_read)
1385
Antoine Pitrou20e1f932009-08-06 20:18:29 +00001386 def test_write_after_readahead(self):
1387 # Issue #6629: writing after the buffer was filled by readahead should
1388 # first rewind the raw stream.
1389 for overwrite_size in [1, 5]:
1390 raw = self.BytesIO(b"A" * 10)
1391 bufio = self.tp(raw, 4)
1392 # Trigger readahead
1393 self.assertEqual(bufio.read(1), b"A")
1394 self.assertEqual(bufio.tell(), 1)
1395 # Overwriting should rewind the raw stream if it needs so
1396 bufio.write(b"B" * overwrite_size)
1397 self.assertEqual(bufio.tell(), overwrite_size + 1)
1398 # If the write size was smaller than the buffer size, flush() and
1399 # check that rewind happens.
1400 bufio.flush()
1401 self.assertEqual(bufio.tell(), overwrite_size + 1)
1402 s = raw.getvalue()
1403 self.assertEqual(s,
1404 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1405
Antoine Pitrouf3fa0742010-01-31 22:26:04 +00001406 def test_truncate_after_read_or_write(self):
1407 raw = self.BytesIO(b"A" * 10)
1408 bufio = self.tp(raw, 100)
1409 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1410 self.assertEqual(bufio.truncate(), 2)
1411 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1412 self.assertEqual(bufio.truncate(), 4)
1413
Antoine Pitrou19690592009-06-12 20:14:08 +00001414 def test_misbehaved_io(self):
1415 BufferedReaderTest.test_misbehaved_io(self)
1416 BufferedWriterTest.test_misbehaved_io(self)
1417
1418class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest):
1419 tp = io.BufferedRandom
1420
1421 def test_constructor(self):
1422 BufferedRandomTest.test_constructor(self)
1423 # The allocation can succeed on 32-bit builds, e.g. with more
1424 # than 2GB RAM and a 64-bit kernel.
1425 if sys.maxsize > 0x7FFFFFFF:
1426 rawio = self.MockRawIO()
1427 bufio = self.tp(rawio)
1428 self.assertRaises((OverflowError, MemoryError, ValueError),
1429 bufio.__init__, rawio, sys.maxsize)
1430
1431 def test_garbage_collection(self):
1432 CBufferedReaderTest.test_garbage_collection(self)
1433 CBufferedWriterTest.test_garbage_collection(self)
1434
1435class PyBufferedRandomTest(BufferedRandomTest):
1436 tp = pyio.BufferedRandom
1437
1438
Christian Heimes1a6387e2008-03-26 12:49:49 +00001439# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1440# properties:
1441# - A single output character can correspond to many bytes of input.
1442# - The number of input bytes to complete the character can be
1443# undetermined until the last input byte is received.
1444# - The number of input bytes can vary depending on previous input.
1445# - A single input byte can correspond to many characters of output.
1446# - The number of output characters can be undetermined until the
1447# last input byte is received.
1448# - The number of output characters can vary depending on previous input.
1449
1450class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1451 """
1452 For testing seek/tell behavior with a stateful, buffering decoder.
1453
1454 Input is a sequence of words. Words may be fixed-length (length set
1455 by input) or variable-length (period-terminated). In variable-length
1456 mode, extra periods are ignored. Possible words are:
1457 - 'i' followed by a number sets the input length, I (maximum 99).
1458 When I is set to 0, words are space-terminated.
1459 - 'o' followed by a number sets the output length, O (maximum 99).
1460 - Any other word is converted into a word followed by a period on
1461 the output. The output word consists of the input word truncated
1462 or padded out with hyphens to make its length equal to O. If O
1463 is 0, the word is output verbatim without truncating or padding.
1464 I and O are initially set to 1. When I changes, any buffered input is
1465 re-scanned according to the new I. EOF also terminates the last word.
1466 """
1467
1468 def __init__(self, errors='strict'):
1469 codecs.IncrementalDecoder.__init__(self, errors)
1470 self.reset()
1471
1472 def __repr__(self):
1473 return '<SID %x>' % id(self)
1474
1475 def reset(self):
1476 self.i = 1
1477 self.o = 1
1478 self.buffer = bytearray()
1479
1480 def getstate(self):
1481 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1482 return bytes(self.buffer), i*100 + o
1483
1484 def setstate(self, state):
1485 buffer, io = state
1486 self.buffer = bytearray(buffer)
1487 i, o = divmod(io, 100)
1488 self.i, self.o = i ^ 1, o ^ 1
1489
1490 def decode(self, input, final=False):
1491 output = ''
1492 for b in input:
1493 if self.i == 0: # variable-length, terminated with period
Amaury Forgeot d'Arcce6f6c12008-04-01 22:37:33 +00001494 if b == '.':
Christian Heimes1a6387e2008-03-26 12:49:49 +00001495 if self.buffer:
1496 output += self.process_word()
1497 else:
1498 self.buffer.append(b)
1499 else: # fixed-length, terminate after self.i bytes
1500 self.buffer.append(b)
1501 if len(self.buffer) == self.i:
1502 output += self.process_word()
1503 if final and self.buffer: # EOF terminates the last word
1504 output += self.process_word()
1505 return output
1506
1507 def process_word(self):
1508 output = ''
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001509 if self.buffer[0] == ord('i'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001510 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
Amaury Forgeot d'Arc7684f852008-05-03 12:21:13 +00001511 elif self.buffer[0] == ord('o'):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001512 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1513 else:
1514 output = self.buffer.decode('ascii')
1515 if len(output) < self.o:
1516 output += '-'*self.o # pad out with hyphens
1517 if self.o:
1518 output = output[:self.o] # truncate to output length
1519 output += '.'
1520 self.buffer = bytearray()
1521 return output
1522
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001523 codecEnabled = False
1524
1525 @classmethod
1526 def lookupTestDecoder(cls, name):
1527 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001528 latin1 = codecs.lookup('latin-1')
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001529 return codecs.CodecInfo(
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001530 name='test_decoder', encode=latin1.encode, decode=None,
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001531 incrementalencoder=None,
1532 streamreader=None, streamwriter=None,
1533 incrementaldecoder=cls)
1534
1535# Register the previous decoder for testing.
1536# Disabled by default, tests will enable it.
1537codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1538
1539
Christian Heimes1a6387e2008-03-26 12:49:49 +00001540class StatefulIncrementalDecoderTest(unittest.TestCase):
1541 """
1542 Make sure the StatefulIncrementalDecoder actually works.
1543 """
1544
1545 test_cases = [
1546 # I=1, O=1 (fixed-length input == fixed-length output)
1547 (b'abcd', False, 'a.b.c.d.'),
1548 # I=0, O=0 (variable-length input, variable-length output)
1549 (b'oiabcd', True, 'abcd.'),
1550 # I=0, O=0 (should ignore extra periods)
1551 (b'oi...abcd...', True, 'abcd.'),
1552 # I=0, O=6 (variable-length input, fixed-length output)
1553 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1554 # I=2, O=6 (fixed-length input < fixed-length output)
1555 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1556 # I=6, O=3 (fixed-length input > fixed-length output)
1557 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1558 # I=0, then 3; O=29, then 15 (with longer output)
1559 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1560 'a----------------------------.' +
1561 'b----------------------------.' +
1562 'cde--------------------------.' +
1563 'abcdefghijabcde.' +
1564 'a.b------------.' +
1565 '.c.------------.' +
1566 'd.e------------.' +
1567 'k--------------.' +
1568 'l--------------.' +
1569 'm--------------.')
1570 ]
1571
Antoine Pitrou19690592009-06-12 20:14:08 +00001572 def test_decoder(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001573 # Try a few one-shot test cases.
1574 for input, eof, output in self.test_cases:
1575 d = StatefulIncrementalDecoder()
1576 self.assertEquals(d.decode(input, eof), output)
1577
1578 # Also test an unfinished decode, followed by forcing EOF.
1579 d = StatefulIncrementalDecoder()
1580 self.assertEquals(d.decode(b'oiabcd'), '')
1581 self.assertEquals(d.decode(b'', 1), 'abcd.')
1582
1583class TextIOWrapperTest(unittest.TestCase):
1584
1585 def setUp(self):
1586 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1587 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Antoine Pitrou19690592009-06-12 20:14:08 +00001588 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001589
1590 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00001591 support.unlink(support.TESTFN)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001592
Antoine Pitrou19690592009-06-12 20:14:08 +00001593 def test_constructor(self):
1594 r = self.BytesIO(b"\xc3\xa9\n\n")
1595 b = self.BufferedReader(r, 1000)
1596 t = self.TextIOWrapper(b)
1597 t.__init__(b, encoding="latin1", newline="\r\n")
1598 self.assertEquals(t.encoding, "latin1")
1599 self.assertEquals(t.line_buffering, False)
1600 t.__init__(b, encoding="utf8", line_buffering=True)
1601 self.assertEquals(t.encoding, "utf8")
1602 self.assertEquals(t.line_buffering, True)
1603 self.assertEquals("\xe9\n", t.readline())
1604 self.assertRaises(TypeError, t.__init__, b, newline=42)
1605 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1606
1607 def test_detach(self):
1608 r = self.BytesIO()
1609 b = self.BufferedWriter(r)
1610 t = self.TextIOWrapper(b)
1611 self.assertIs(t.detach(), b)
1612
1613 t = self.TextIOWrapper(b, encoding="ascii")
1614 t.write("howdy")
1615 self.assertFalse(r.getvalue())
1616 t.detach()
1617 self.assertEqual(r.getvalue(), b"howdy")
1618 self.assertRaises(ValueError, t.detach)
1619
1620 def test_repr(self):
1621 raw = self.BytesIO("hello".encode("utf-8"))
1622 b = self.BufferedReader(raw)
1623 t = self.TextIOWrapper(b, encoding="utf-8")
1624 modname = self.TextIOWrapper.__module__
1625 self.assertEqual(repr(t),
1626 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1627 raw.name = "dummy"
1628 self.assertEqual(repr(t),
1629 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1630 raw.name = b"dummy"
1631 self.assertEqual(repr(t),
1632 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1633
1634 def test_line_buffering(self):
1635 r = self.BytesIO()
1636 b = self.BufferedWriter(r, 1000)
1637 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1638 t.write("X")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001639 self.assertEquals(r.getvalue(), b"") # No flush happened
Antoine Pitrou19690592009-06-12 20:14:08 +00001640 t.write("Y\nZ")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001641 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
Antoine Pitrou19690592009-06-12 20:14:08 +00001642 t.write("A\rB")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001643 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1644
Antoine Pitrou19690592009-06-12 20:14:08 +00001645 def test_encoding(self):
1646 # Check the encoding attribute is always set, and valid
1647 b = self.BytesIO()
1648 t = self.TextIOWrapper(b, encoding="utf8")
1649 self.assertEqual(t.encoding, "utf8")
1650 t = self.TextIOWrapper(b)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001651 self.assertTrue(t.encoding is not None)
Antoine Pitrou19690592009-06-12 20:14:08 +00001652 codecs.lookup(t.encoding)
1653
1654 def test_encoding_errors_reading(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001655 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001656 b = self.BytesIO(b"abc\n\xff\n")
1657 t = self.TextIOWrapper(b, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001658 self.assertRaises(UnicodeError, t.read)
1659 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001660 b = self.BytesIO(b"abc\n\xff\n")
1661 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001662 self.assertRaises(UnicodeError, t.read)
1663 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001664 b = self.BytesIO(b"abc\n\xff\n")
1665 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001666 self.assertEquals(t.read(), "abc\n\n")
1667 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001668 b = self.BytesIO(b"abc\n\xff\n")
1669 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1670 self.assertEquals(t.read(), "abc\n\ufffd\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001671
Antoine Pitrou19690592009-06-12 20:14:08 +00001672 def test_encoding_errors_writing(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001673 # (1) default
Antoine Pitrou19690592009-06-12 20:14:08 +00001674 b = self.BytesIO()
1675 t = self.TextIOWrapper(b, encoding="ascii")
1676 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001677 # (2) explicit strict
Antoine Pitrou19690592009-06-12 20:14:08 +00001678 b = self.BytesIO()
1679 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1680 self.assertRaises(UnicodeError, t.write, "\xff")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001681 # (3) ignore
Antoine Pitrou19690592009-06-12 20:14:08 +00001682 b = self.BytesIO()
1683 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001684 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001685 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001686 t.flush()
1687 self.assertEquals(b.getvalue(), b"abcdef\n")
1688 # (4) replace
Antoine Pitrou19690592009-06-12 20:14:08 +00001689 b = self.BytesIO()
1690 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Christian Heimes1a6387e2008-03-26 12:49:49 +00001691 newline="\n")
Antoine Pitrou19690592009-06-12 20:14:08 +00001692 t.write("abc\xffdef\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001693 t.flush()
1694 self.assertEquals(b.getvalue(), b"abc?def\n")
1695
Antoine Pitrou19690592009-06-12 20:14:08 +00001696 def test_newlines(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001697 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1698
1699 tests = [
1700 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1701 [ '', input_lines ],
1702 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1703 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1704 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1705 ]
Antoine Pitrou655fbf12008-12-14 17:40:51 +00001706 encodings = (
1707 'utf-8', 'latin-1',
1708 'utf-16', 'utf-16-le', 'utf-16-be',
1709 'utf-32', 'utf-32-le', 'utf-32-be',
1710 )
Christian Heimes1a6387e2008-03-26 12:49:49 +00001711
1712 # Try a range of buffer sizes to test the case where \r is the last
1713 # character in TextIOWrapper._pending_line.
1714 for encoding in encodings:
1715 # XXX: str.encode() should return bytes
1716 data = bytes(''.join(input_lines).encode(encoding))
1717 for do_reads in (False, True):
1718 for bufsize in range(1, 10):
1719 for newline, exp_lines in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001720 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1721 textio = self.TextIOWrapper(bufio, newline=newline,
Christian Heimes1a6387e2008-03-26 12:49:49 +00001722 encoding=encoding)
1723 if do_reads:
1724 got_lines = []
1725 while True:
1726 c2 = textio.read(2)
1727 if c2 == '':
1728 break
1729 self.assertEquals(len(c2), 2)
1730 got_lines.append(c2 + textio.readline())
1731 else:
1732 got_lines = list(textio)
1733
1734 for got_line, exp_line in zip(got_lines, exp_lines):
1735 self.assertEquals(got_line, exp_line)
1736 self.assertEquals(len(got_lines), len(exp_lines))
1737
Antoine Pitrou19690592009-06-12 20:14:08 +00001738 def test_newlines_input(self):
1739 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001740 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1741 for newline, expected in [
1742 (None, normalized.decode("ascii").splitlines(True)),
1743 ("", testdata.decode("ascii").splitlines(True)),
Antoine Pitrou19690592009-06-12 20:14:08 +00001744 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1745 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1746 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Christian Heimes1a6387e2008-03-26 12:49:49 +00001747 ]:
Antoine Pitrou19690592009-06-12 20:14:08 +00001748 buf = self.BytesIO(testdata)
1749 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001750 self.assertEquals(txt.readlines(), expected)
1751 txt.seek(0)
1752 self.assertEquals(txt.read(), "".join(expected))
1753
Antoine Pitrou19690592009-06-12 20:14:08 +00001754 def test_newlines_output(self):
1755 testdict = {
1756 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1757 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1758 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1759 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1760 }
1761 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1762 for newline, expected in tests:
1763 buf = self.BytesIO()
1764 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1765 txt.write("AAA\nB")
1766 txt.write("BB\nCCC\n")
1767 txt.write("X\rY\r\nZ")
1768 txt.flush()
1769 self.assertEquals(buf.closed, False)
1770 self.assertEquals(buf.getvalue(), expected)
1771
1772 def test_destructor(self):
1773 l = []
1774 base = self.BytesIO
1775 class MyBytesIO(base):
1776 def close(self):
1777 l.append(self.getvalue())
1778 base.close(self)
1779 b = MyBytesIO()
1780 t = self.TextIOWrapper(b, encoding="ascii")
1781 t.write("abc")
1782 del t
1783 support.gc_collect()
1784 self.assertEquals([b"abc"], l)
1785
1786 def test_override_destructor(self):
1787 record = []
1788 class MyTextIO(self.TextIOWrapper):
1789 def __del__(self):
1790 record.append(1)
1791 try:
1792 f = super(MyTextIO, self).__del__
1793 except AttributeError:
1794 pass
1795 else:
1796 f()
1797 def close(self):
1798 record.append(2)
1799 super(MyTextIO, self).close()
1800 def flush(self):
1801 record.append(3)
1802 super(MyTextIO, self).flush()
1803 b = self.BytesIO()
1804 t = MyTextIO(b, encoding="ascii")
1805 del t
1806 support.gc_collect()
1807 self.assertEqual(record, [1, 2, 3])
1808
1809 def test_error_through_destructor(self):
1810 # Test that the exception state is not modified by a destructor,
1811 # even if close() fails.
1812 rawio = self.CloseFailureIO()
1813 def f():
1814 self.TextIOWrapper(rawio).xyzzy
1815 with support.captured_output("stderr") as s:
1816 self.assertRaises(AttributeError, f)
1817 s = s.getvalue().strip()
1818 if s:
1819 # The destructor *may* have printed an unraisable error, check it
1820 self.assertEqual(len(s.splitlines()), 1)
Benjamin Peterson5c8da862009-06-30 22:57:08 +00001821 self.assertTrue(s.startswith("Exception IOError: "), s)
1822 self.assertTrue(s.endswith(" ignored"), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001823
1824 # Systematic tests of the text I/O API
1825
Antoine Pitrou19690592009-06-12 20:14:08 +00001826 def test_basic_io(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001827 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1828 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Antoine Pitrou19690592009-06-12 20:14:08 +00001829 f = self.open(support.TESTFN, "w+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001830 f._CHUNK_SIZE = chunksize
Antoine Pitrou19690592009-06-12 20:14:08 +00001831 self.assertEquals(f.write("abc"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001832 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001833 f = self.open(support.TESTFN, "r+", encoding=enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001834 f._CHUNK_SIZE = chunksize
1835 self.assertEquals(f.tell(), 0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001836 self.assertEquals(f.read(), "abc")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001837 cookie = f.tell()
1838 self.assertEquals(f.seek(0), 0)
Benjamin Petersonddd392c2009-12-13 19:19:07 +00001839 self.assertEquals(f.read(None), "abc")
1840 f.seek(0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001841 self.assertEquals(f.read(2), "ab")
1842 self.assertEquals(f.read(1), "c")
1843 self.assertEquals(f.read(1), "")
1844 self.assertEquals(f.read(), "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001845 self.assertEquals(f.tell(), cookie)
1846 self.assertEquals(f.seek(0), 0)
1847 self.assertEquals(f.seek(0, 2), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001848 self.assertEquals(f.write("def"), 3)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001849 self.assertEquals(f.seek(cookie), cookie)
Antoine Pitrou19690592009-06-12 20:14:08 +00001850 self.assertEquals(f.read(), "def")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001851 if enc.startswith("utf"):
1852 self.multi_line_test(f, enc)
1853 f.close()
1854
1855 def multi_line_test(self, f, enc):
1856 f.seek(0)
1857 f.truncate()
Antoine Pitrou19690592009-06-12 20:14:08 +00001858 sample = "s\xff\u0fff\uffff"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001859 wlines = []
1860 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
1861 chars = []
1862 for i in range(size):
1863 chars.append(sample[i % len(sample)])
Antoine Pitrou19690592009-06-12 20:14:08 +00001864 line = "".join(chars) + "\n"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001865 wlines.append((f.tell(), line))
1866 f.write(line)
1867 f.seek(0)
1868 rlines = []
1869 while True:
1870 pos = f.tell()
1871 line = f.readline()
1872 if not line:
1873 break
1874 rlines.append((pos, line))
1875 self.assertEquals(rlines, wlines)
1876
Antoine Pitrou19690592009-06-12 20:14:08 +00001877 def test_telling(self):
1878 f = self.open(support.TESTFN, "w+", encoding="utf8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001879 p0 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001880 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001881 p1 = f.tell()
Antoine Pitrou19690592009-06-12 20:14:08 +00001882 f.write("\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001883 p2 = f.tell()
1884 f.seek(0)
1885 self.assertEquals(f.tell(), p0)
Antoine Pitrou19690592009-06-12 20:14:08 +00001886 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001887 self.assertEquals(f.tell(), p1)
Antoine Pitrou19690592009-06-12 20:14:08 +00001888 self.assertEquals(f.readline(), "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001889 self.assertEquals(f.tell(), p2)
1890 f.seek(0)
1891 for line in f:
Antoine Pitrou19690592009-06-12 20:14:08 +00001892 self.assertEquals(line, "\xff\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001893 self.assertRaises(IOError, f.tell)
1894 self.assertEquals(f.tell(), p2)
1895 f.close()
1896
Antoine Pitrou19690592009-06-12 20:14:08 +00001897 def test_seeking(self):
1898 chunk_size = _default_chunk_size()
Christian Heimes1a6387e2008-03-26 12:49:49 +00001899 prefix_size = chunk_size - 2
1900 u_prefix = "a" * prefix_size
1901 prefix = bytes(u_prefix.encode("utf-8"))
1902 self.assertEquals(len(u_prefix), len(prefix))
1903 u_suffix = "\u8888\n"
1904 suffix = bytes(u_suffix.encode("utf-8"))
1905 line = prefix + suffix
Antoine Pitrou19690592009-06-12 20:14:08 +00001906 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001907 f.write(line*2)
1908 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001909 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001910 s = f.read(prefix_size)
Antoine Pitrou19690592009-06-12 20:14:08 +00001911 self.assertEquals(s, prefix.decode("ascii"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00001912 self.assertEquals(f.tell(), prefix_size)
1913 self.assertEquals(f.readline(), u_suffix)
1914
Antoine Pitrou19690592009-06-12 20:14:08 +00001915 def test_seeking_too(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001916 # Regression test for a specific bug
1917 data = b'\xe0\xbf\xbf\n'
Antoine Pitrou19690592009-06-12 20:14:08 +00001918 f = self.open(support.TESTFN, "wb")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001919 f.write(data)
1920 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001921 f = self.open(support.TESTFN, "r", encoding="utf-8")
Christian Heimes1a6387e2008-03-26 12:49:49 +00001922 f._CHUNK_SIZE # Just test that it exists
1923 f._CHUNK_SIZE = 2
1924 f.readline()
1925 f.tell()
1926
Antoine Pitrou19690592009-06-12 20:14:08 +00001927 def test_seek_and_tell(self):
1928 #Test seek/tell using the StatefulIncrementalDecoder.
1929 # Make test faster by doing smaller seeks
1930 CHUNK_SIZE = 128
Christian Heimes1a6387e2008-03-26 12:49:49 +00001931
Antoine Pitrou19690592009-06-12 20:14:08 +00001932 def test_seek_and_tell_with_data(data, min_pos=0):
Christian Heimes1a6387e2008-03-26 12:49:49 +00001933 """Tell/seek to various points within a data stream and ensure
1934 that the decoded data returned by read() is consistent."""
Antoine Pitrou19690592009-06-12 20:14:08 +00001935 f = self.open(support.TESTFN, 'wb')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001936 f.write(data)
1937 f.close()
Antoine Pitrou19690592009-06-12 20:14:08 +00001938 f = self.open(support.TESTFN, encoding='test_decoder')
1939 f._CHUNK_SIZE = CHUNK_SIZE
Christian Heimes1a6387e2008-03-26 12:49:49 +00001940 decoded = f.read()
1941 f.close()
1942
1943 for i in range(min_pos, len(decoded) + 1): # seek positions
1944 for j in [1, 5, len(decoded) - i]: # read lengths
Antoine Pitrou19690592009-06-12 20:14:08 +00001945 f = self.open(support.TESTFN, encoding='test_decoder')
Christian Heimes1a6387e2008-03-26 12:49:49 +00001946 self.assertEquals(f.read(i), decoded[:i])
1947 cookie = f.tell()
1948 self.assertEquals(f.read(j), decoded[i:i + j])
1949 f.seek(cookie)
1950 self.assertEquals(f.read(), decoded[i:])
1951 f.close()
1952
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001953 # Enable the test decoder.
1954 StatefulIncrementalDecoder.codecEnabled = 1
Christian Heimes1a6387e2008-03-26 12:49:49 +00001955
1956 # Run the tests.
1957 try:
1958 # Try each test case.
1959 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Antoine Pitrou19690592009-06-12 20:14:08 +00001960 test_seek_and_tell_with_data(input)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001961
1962 # Position each test case so that it crosses a chunk boundary.
Christian Heimes1a6387e2008-03-26 12:49:49 +00001963 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1964 offset = CHUNK_SIZE - len(input)//2
1965 prefix = b'.'*offset
1966 # Don't bother seeking into the prefix (takes too long).
1967 min_pos = offset*2
Antoine Pitrou19690592009-06-12 20:14:08 +00001968 test_seek_and_tell_with_data(prefix + input, min_pos)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001969
1970 # Ensure our test decoder won't interfere with subsequent tests.
1971 finally:
Amaury Forgeot d'Arcf0a49702008-04-01 22:52:48 +00001972 StatefulIncrementalDecoder.codecEnabled = 0
Christian Heimes1a6387e2008-03-26 12:49:49 +00001973
Antoine Pitrou19690592009-06-12 20:14:08 +00001974 def test_encoded_writes(self):
1975 data = "1234567890"
Christian Heimes1a6387e2008-03-26 12:49:49 +00001976 tests = ("utf-16",
1977 "utf-16-le",
1978 "utf-16-be",
1979 "utf-32",
1980 "utf-32-le",
1981 "utf-32-be")
1982 for encoding in tests:
Antoine Pitrou19690592009-06-12 20:14:08 +00001983 buf = self.BytesIO()
1984 f = self.TextIOWrapper(buf, encoding=encoding)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001985 # Check if the BOM is written only once (see issue1753).
1986 f.write(data)
1987 f.write(data)
1988 f.seek(0)
1989 self.assertEquals(f.read(), data * 2)
Antoine Pitrou19690592009-06-12 20:14:08 +00001990 f.seek(0)
1991 self.assertEquals(f.read(), data * 2)
Christian Heimes1a6387e2008-03-26 12:49:49 +00001992 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1993
Antoine Pitrou19690592009-06-12 20:14:08 +00001994 def test_unreadable(self):
1995 class UnReadable(self.BytesIO):
1996 def readable(self):
1997 return False
1998 txt = self.TextIOWrapper(UnReadable())
1999 self.assertRaises(IOError, txt.read)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002000
Antoine Pitrou19690592009-06-12 20:14:08 +00002001 def test_read_one_by_one(self):
2002 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002003 reads = ""
2004 while True:
2005 c = txt.read(1)
2006 if not c:
2007 break
2008 reads += c
2009 self.assertEquals(reads, "AA\nBB")
2010
Benjamin Petersonddd392c2009-12-13 19:19:07 +00002011 def test_readlines(self):
2012 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2013 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2014 txt.seek(0)
2015 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2016 txt.seek(0)
2017 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2018
Christian Heimes1a6387e2008-03-26 12:49:49 +00002019 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Antoine Pitrou19690592009-06-12 20:14:08 +00002020 def test_read_by_chunk(self):
Christian Heimes1a6387e2008-03-26 12:49:49 +00002021 # make sure "\r\n" straddles 128 char boundary.
Antoine Pitrou19690592009-06-12 20:14:08 +00002022 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002023 reads = ""
2024 while True:
2025 c = txt.read(128)
2026 if not c:
2027 break
2028 reads += c
2029 self.assertEquals(reads, "A"*127+"\nB")
2030
2031 def test_issue1395_1(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002032 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002033
2034 # read one char at a time
2035 reads = ""
2036 while True:
2037 c = txt.read(1)
2038 if not c:
2039 break
2040 reads += c
2041 self.assertEquals(reads, self.normalized)
2042
2043 def test_issue1395_2(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002044 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002045 txt._CHUNK_SIZE = 4
2046
2047 reads = ""
2048 while True:
2049 c = txt.read(4)
2050 if not c:
2051 break
2052 reads += c
2053 self.assertEquals(reads, self.normalized)
2054
2055 def test_issue1395_3(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002056 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002057 txt._CHUNK_SIZE = 4
2058
2059 reads = txt.read(4)
2060 reads += txt.read(4)
2061 reads += txt.readline()
2062 reads += txt.readline()
2063 reads += txt.readline()
2064 self.assertEquals(reads, self.normalized)
2065
2066 def test_issue1395_4(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002067 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002068 txt._CHUNK_SIZE = 4
2069
2070 reads = txt.read(4)
2071 reads += txt.read()
2072 self.assertEquals(reads, self.normalized)
2073
2074 def test_issue1395_5(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002075 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002076 txt._CHUNK_SIZE = 4
2077
2078 reads = txt.read(4)
2079 pos = txt.tell()
2080 txt.seek(0)
2081 txt.seek(pos)
2082 self.assertEquals(txt.read(4), "BBB\n")
2083
2084 def test_issue2282(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002085 buffer = self.BytesIO(self.testdata)
2086 txt = self.TextIOWrapper(buffer, encoding="ascii")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002087
2088 self.assertEqual(buffer.seekable(), txt.seekable())
2089
Antoine Pitrou19690592009-06-12 20:14:08 +00002090 @unittest.skip("Issue #6213 with incremental encoders")
2091 def test_append_bom(self):
2092 # The BOM is not written again when appending to a non-empty file
2093 filename = support.TESTFN
2094 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2095 with self.open(filename, 'w', encoding=charset) as f:
2096 f.write('aaa')
2097 pos = f.tell()
2098 with self.open(filename, 'rb') as f:
2099 self.assertEquals(f.read(), 'aaa'.encode(charset))
2100
2101 with self.open(filename, 'a', encoding=charset) as f:
2102 f.write('xxx')
2103 with self.open(filename, 'rb') as f:
2104 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2105
2106 @unittest.skip("Issue #6213 with incremental encoders")
2107 def test_seek_bom(self):
2108 # Same test, but when seeking manually
2109 filename = support.TESTFN
2110 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2111 with self.open(filename, 'w', encoding=charset) as f:
2112 f.write('aaa')
2113 pos = f.tell()
2114 with self.open(filename, 'r+', encoding=charset) as f:
2115 f.seek(pos)
2116 f.write('zzz')
2117 f.seek(0)
2118 f.write('bbb')
2119 with self.open(filename, 'rb') as f:
2120 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2121
2122 def test_errors_property(self):
2123 with self.open(support.TESTFN, "w") as f:
2124 self.assertEqual(f.errors, "strict")
2125 with self.open(support.TESTFN, "w", errors="replace") as f:
2126 self.assertEqual(f.errors, "replace")
2127
Victor Stinner6a102812010-04-27 23:55:59 +00002128 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcfff896b2009-08-29 18:14:40 +00002129 def test_threads_write(self):
2130 # Issue6750: concurrent writes could duplicate data
2131 event = threading.Event()
2132 with self.open(support.TESTFN, "w", buffering=1) as f:
2133 def run(n):
2134 text = "Thread%03d\n" % n
2135 event.wait()
2136 f.write(text)
2137 threads = [threading.Thread(target=lambda n=x: run(n))
2138 for x in range(20)]
2139 for t in threads:
2140 t.start()
2141 time.sleep(0.02)
2142 event.set()
2143 for t in threads:
2144 t.join()
2145 with self.open(support.TESTFN) as f:
2146 content = f.read()
2147 for n in range(20):
2148 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2149
Antoine Pitrouf7fd8e42010-05-03 16:25:33 +00002150 def test_flush_error_on_close(self):
2151 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2152 def bad_flush():
2153 raise IOError()
2154 txt.flush = bad_flush
2155 self.assertRaises(IOError, txt.close) # exception not swallowed
2156
2157 def test_multi_close(self):
2158 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2159 txt.close()
2160 txt.close()
2161 txt.close()
2162 self.assertRaises(ValueError, txt.flush)
2163
Antoine Pitrou19690592009-06-12 20:14:08 +00002164class CTextIOWrapperTest(TextIOWrapperTest):
2165
2166 def test_initialization(self):
2167 r = self.BytesIO(b"\xc3\xa9\n\n")
2168 b = self.BufferedReader(r, 1000)
2169 t = self.TextIOWrapper(b)
2170 self.assertRaises(TypeError, t.__init__, b, newline=42)
2171 self.assertRaises(ValueError, t.read)
2172 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2173 self.assertRaises(ValueError, t.read)
2174
2175 def test_garbage_collection(self):
2176 # C TextIOWrapper objects are collected, and collecting them flushes
2177 # all data to disk.
2178 # The Python version has __del__, so it ends in gc.garbage instead.
2179 rawio = io.FileIO(support.TESTFN, "wb")
2180 b = self.BufferedWriter(rawio)
2181 t = self.TextIOWrapper(b, encoding="ascii")
2182 t.write("456def")
2183 t.x = t
2184 wr = weakref.ref(t)
2185 del t
2186 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002187 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotocdcd4bf2009-06-17 07:05:33 +00002188 with self.open(support.TESTFN, "rb") as f:
Antoine Pitrou19690592009-06-12 20:14:08 +00002189 self.assertEqual(f.read(), b"456def")
2190
2191class PyTextIOWrapperTest(TextIOWrapperTest):
2192 pass
2193
2194
2195class IncrementalNewlineDecoderTest(unittest.TestCase):
2196
2197 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002198 # UTF-8 specific tests for a newline decoder
2199 def _check_decode(b, s, **kwargs):
2200 # We exercise getstate() / setstate() as well as decode()
2201 state = decoder.getstate()
2202 self.assertEquals(decoder.decode(b, **kwargs), s)
2203 decoder.setstate(state)
2204 self.assertEquals(decoder.decode(b, **kwargs), s)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002205
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002206 _check_decode(b'\xe8\xa2\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002207
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002208 _check_decode(b'\xe8', "")
2209 _check_decode(b'\xa2', "")
2210 _check_decode(b'\x88', "\u8888")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002211
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002212 _check_decode(b'\xe8', "")
2213 _check_decode(b'\xa2', "")
2214 _check_decode(b'\x88', "\u8888")
2215
2216 _check_decode(b'\xe8', "")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002217 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2218
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002219 decoder.reset()
2220 _check_decode(b'\n', "\n")
2221 _check_decode(b'\r', "")
2222 _check_decode(b'', "\n", final=True)
2223 _check_decode(b'\r', "\n", final=True)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002224
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002225 _check_decode(b'\r', "")
2226 _check_decode(b'a', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002227
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002228 _check_decode(b'\r\r\n', "\n\n")
2229 _check_decode(b'\r', "")
2230 _check_decode(b'\r', "\n")
2231 _check_decode(b'\na', "\na")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002232
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002233 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2234 _check_decode(b'\xe8\xa2\x88', "\u8888")
2235 _check_decode(b'\n', "\n")
2236 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2237 _check_decode(b'\n', "\n")
Christian Heimes1a6387e2008-03-26 12:49:49 +00002238
Antoine Pitrou19690592009-06-12 20:14:08 +00002239 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002240 result = []
Antoine Pitrou19690592009-06-12 20:14:08 +00002241 if encoding is not None:
2242 encoder = codecs.getincrementalencoder(encoding)()
2243 def _decode_bytewise(s):
2244 # Decode one byte at a time
2245 for b in encoder.encode(s):
2246 result.append(decoder.decode(b))
2247 else:
2248 encoder = None
2249 def _decode_bytewise(s):
2250 # Decode one char at a time
2251 for c in s:
2252 result.append(decoder.decode(c))
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002253 self.assertEquals(decoder.newlines, None)
2254 _decode_bytewise("abc\n\r")
2255 self.assertEquals(decoder.newlines, '\n')
2256 _decode_bytewise("\nabc")
2257 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2258 _decode_bytewise("abc\r")
2259 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2260 _decode_bytewise("abc")
2261 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2262 _decode_bytewise("abc\r")
2263 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2264 decoder.reset()
Antoine Pitrou19690592009-06-12 20:14:08 +00002265 input = "abc"
2266 if encoder is not None:
2267 encoder.reset()
2268 input = encoder.encode(input)
2269 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002270 self.assertEquals(decoder.newlines, None)
2271
2272 def test_newline_decoder(self):
2273 encodings = (
Antoine Pitrou19690592009-06-12 20:14:08 +00002274 # None meaning the IncrementalNewlineDecoder takes unicode input
2275 # rather than bytes input
2276 None, 'utf-8', 'latin-1',
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002277 'utf-16', 'utf-16-le', 'utf-16-be',
2278 'utf-32', 'utf-32-le', 'utf-32-be',
2279 )
2280 for enc in encodings:
Antoine Pitrou19690592009-06-12 20:14:08 +00002281 decoder = enc and codecs.getincrementaldecoder(enc)()
2282 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2283 self.check_newline_decoding(decoder, enc)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002284 decoder = codecs.getincrementaldecoder("utf-8")()
Antoine Pitrou19690592009-06-12 20:14:08 +00002285 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2286 self.check_newline_decoding_utf8(decoder)
2287
2288 def test_newline_bytes(self):
2289 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2290 def _check(dec):
2291 self.assertEquals(dec.newlines, None)
2292 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2293 self.assertEquals(dec.newlines, None)
2294 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2295 self.assertEquals(dec.newlines, None)
2296 dec = self.IncrementalNewlineDecoder(None, translate=False)
2297 _check(dec)
2298 dec = self.IncrementalNewlineDecoder(None, translate=True)
2299 _check(dec)
2300
2301class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2302 pass
2303
2304class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2305 pass
Antoine Pitrou655fbf12008-12-14 17:40:51 +00002306
Christian Heimes1a6387e2008-03-26 12:49:49 +00002307
2308# XXX Tests for open()
2309
2310class MiscIOTest(unittest.TestCase):
2311
Benjamin Petersonad100c32008-11-20 22:06:22 +00002312 def tearDown(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002313 support.unlink(support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002314
Antoine Pitrou19690592009-06-12 20:14:08 +00002315 def test___all__(self):
2316 for name in self.io.__all__:
2317 obj = getattr(self.io, name, None)
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002318 self.assertTrue(obj is not None, name)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002319 if name == "open":
2320 continue
Antoine Pitrou19690592009-06-12 20:14:08 +00002321 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Peterson3633c4f2009-04-02 01:03:17 +00002322 self.assertTrue(issubclass(obj, Exception), name)
2323 elif not name.startswith("SEEK_"):
Antoine Pitrou19690592009-06-12 20:14:08 +00002324 self.assertTrue(issubclass(obj, self.IOBase))
Christian Heimes1a6387e2008-03-26 12:49:49 +00002325
Benjamin Petersonad100c32008-11-20 22:06:22 +00002326 def test_attributes(self):
Antoine Pitrou19690592009-06-12 20:14:08 +00002327 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002328 self.assertEquals(f.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002329 f.close()
2330
Antoine Pitrou19690592009-06-12 20:14:08 +00002331 f = self.open(support.TESTFN, "U")
2332 self.assertEquals(f.name, support.TESTFN)
2333 self.assertEquals(f.buffer.name, support.TESTFN)
2334 self.assertEquals(f.buffer.raw.name, support.TESTFN)
Benjamin Petersonad100c32008-11-20 22:06:22 +00002335 self.assertEquals(f.mode, "U")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002336 self.assertEquals(f.buffer.mode, "rb")
2337 self.assertEquals(f.buffer.raw.mode, "rb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002338 f.close()
2339
Antoine Pitrou19690592009-06-12 20:14:08 +00002340 f = self.open(support.TESTFN, "w+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002341 self.assertEquals(f.mode, "w+")
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002342 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2343 self.assertEquals(f.buffer.raw.mode, "rb+")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002344
Antoine Pitrou19690592009-06-12 20:14:08 +00002345 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Petersonbfc51562008-11-22 01:59:15 +00002346 self.assertEquals(g.mode, "wb")
2347 self.assertEquals(g.raw.mode, "wb")
Benjamin Petersonad100c32008-11-20 22:06:22 +00002348 self.assertEquals(g.name, f.fileno())
2349 self.assertEquals(g.raw.name, f.fileno())
2350 f.close()
2351 g.close()
2352
Antoine Pitrou19690592009-06-12 20:14:08 +00002353 def test_io_after_close(self):
2354 for kwargs in [
2355 {"mode": "w"},
2356 {"mode": "wb"},
2357 {"mode": "w", "buffering": 1},
2358 {"mode": "w", "buffering": 2},
2359 {"mode": "wb", "buffering": 0},
2360 {"mode": "r"},
2361 {"mode": "rb"},
2362 {"mode": "r", "buffering": 1},
2363 {"mode": "r", "buffering": 2},
2364 {"mode": "rb", "buffering": 0},
2365 {"mode": "w+"},
2366 {"mode": "w+b"},
2367 {"mode": "w+", "buffering": 1},
2368 {"mode": "w+", "buffering": 2},
2369 {"mode": "w+b", "buffering": 0},
2370 ]:
2371 f = self.open(support.TESTFN, **kwargs)
2372 f.close()
2373 self.assertRaises(ValueError, f.flush)
2374 self.assertRaises(ValueError, f.fileno)
2375 self.assertRaises(ValueError, f.isatty)
2376 self.assertRaises(ValueError, f.__iter__)
2377 if hasattr(f, "peek"):
2378 self.assertRaises(ValueError, f.peek, 1)
2379 self.assertRaises(ValueError, f.read)
2380 if hasattr(f, "read1"):
2381 self.assertRaises(ValueError, f.read1, 1024)
2382 if hasattr(f, "readinto"):
2383 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2384 self.assertRaises(ValueError, f.readline)
2385 self.assertRaises(ValueError, f.readlines)
2386 self.assertRaises(ValueError, f.seek, 0)
2387 self.assertRaises(ValueError, f.tell)
2388 self.assertRaises(ValueError, f.truncate)
2389 self.assertRaises(ValueError, f.write,
2390 b"" if "b" in kwargs['mode'] else "")
2391 self.assertRaises(ValueError, f.writelines, [])
2392 self.assertRaises(ValueError, next, f)
2393
2394 def test_blockingioerror(self):
2395 # Various BlockingIOError issues
2396 self.assertRaises(TypeError, self.BlockingIOError)
2397 self.assertRaises(TypeError, self.BlockingIOError, 1)
2398 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2399 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2400 b = self.BlockingIOError(1, "")
2401 self.assertEqual(b.characters_written, 0)
2402 class C(unicode):
2403 pass
2404 c = C("")
2405 b = self.BlockingIOError(1, c)
2406 c.b = b
2407 b.c = c
2408 wr = weakref.ref(c)
2409 del c, b
2410 support.gc_collect()
Benjamin Peterson5c8da862009-06-30 22:57:08 +00002411 self.assertTrue(wr() is None, wr)
Antoine Pitrou19690592009-06-12 20:14:08 +00002412
2413 def test_abcs(self):
2414 # Test the visible base classes are ABCs.
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002415 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2416 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2417 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2418 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Antoine Pitrou19690592009-06-12 20:14:08 +00002419
2420 def _check_abc_inheritance(self, abcmodule):
2421 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002422 self.assertIsInstance(f, abcmodule.IOBase)
2423 self.assertIsInstance(f, abcmodule.RawIOBase)
2424 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2425 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002426 with self.open(support.TESTFN, "wb") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002427 self.assertIsInstance(f, abcmodule.IOBase)
2428 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2429 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2430 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002431 with self.open(support.TESTFN, "w") as f:
Ezio Melottib0f5adc2010-01-24 16:58:36 +00002432 self.assertIsInstance(f, abcmodule.IOBase)
2433 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2434 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2435 self.assertIsInstance(f, abcmodule.TextIOBase)
Antoine Pitrou19690592009-06-12 20:14:08 +00002436
2437 def test_abc_inheritance(self):
2438 # Test implementations inherit from their respective ABCs
2439 self._check_abc_inheritance(self)
2440
2441 def test_abc_inheritance_official(self):
2442 # Test implementations inherit from the official ABCs of the
2443 # baseline "io" module.
2444 self._check_abc_inheritance(io)
2445
2446class CMiscIOTest(MiscIOTest):
2447 io = io
2448
2449class PyMiscIOTest(MiscIOTest):
2450 io = pyio
Benjamin Petersonad100c32008-11-20 22:06:22 +00002451
Christian Heimes1a6387e2008-03-26 12:49:49 +00002452def test_main():
Antoine Pitrou19690592009-06-12 20:14:08 +00002453 tests = (CIOTest, PyIOTest,
2454 CBufferedReaderTest, PyBufferedReaderTest,
2455 CBufferedWriterTest, PyBufferedWriterTest,
2456 CBufferedRWPairTest, PyBufferedRWPairTest,
2457 CBufferedRandomTest, PyBufferedRandomTest,
2458 StatefulIncrementalDecoderTest,
2459 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2460 CTextIOWrapperTest, PyTextIOWrapperTest,
2461 CMiscIOTest, PyMiscIOTest,
2462 )
2463
2464 # Put the namespaces of the IO module we are testing and some useful mock
2465 # classes in the __dict__ of each test.
2466 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2467 MockNonBlockWriterIO)
2468 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2469 c_io_ns = dict((name, getattr(io, name)) for name in all_members)
2470 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
2471 globs = globals()
2472 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2473 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2474 # Avoid turning open into a bound method.
2475 py_io_ns["open"] = pyio.OpenWrapper
2476 for test in tests:
2477 if test.__name__.startswith("C"):
2478 for name, obj in c_io_ns.items():
2479 setattr(test, name, obj)
2480 elif test.__name__.startswith("Py"):
2481 for name, obj in py_io_ns.items():
2482 setattr(test, name, obj)
2483
2484 support.run_unittest(*tests)
Christian Heimes1a6387e2008-03-26 12:49:49 +00002485
2486if __name__ == "__main__":
Antoine Pitrou19690592009-06-12 20:14:08 +00002487 test_main()