blob: ec390abb5c682f037536083ef98c63720b331a74 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000294 with self.open(support.TESTFN, "wb", buffering=0) as f:
295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 with self.open(support.TESTFN, "rb", buffering=0) as f:
300 self.assertEqual(f.readable(), True)
301 self.assertEqual(f.writable(), False)
302 self.assertEqual(f.seekable(), True)
303 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000304
Guido van Rossum87429772007-04-10 21:06:59 +0000305 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000306 with self.open(support.TESTFN, "wb") as f:
307 self.assertEqual(f.readable(), False)
308 self.assertEqual(f.writable(), True)
309 self.assertEqual(f.seekable(), True)
310 self.write_ops(f)
311 with self.open(support.TESTFN, "rb") as f:
312 self.assertEqual(f.readable(), True)
313 self.assertEqual(f.writable(), False)
314 self.assertEqual(f.seekable(), True)
315 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000316
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000317 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000318 with self.open(support.TESTFN, "wb") as f:
319 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
320 with self.open(support.TESTFN, "rb") as f:
321 self.assertEqual(f.readline(), b"abc\n")
322 self.assertEqual(f.readline(10), b"def\n")
323 self.assertEqual(f.readline(2), b"xy")
324 self.assertEqual(f.readline(4), b"zzy\n")
325 self.assertEqual(f.readline(), b"foo\x00bar\n")
326 self.assertEqual(f.readline(), b"another line")
327 self.assertRaises(TypeError, f.readline, 5.3)
328 with self.open(support.TESTFN, "r") as f:
329 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000332 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.write_ops(f)
334 data = f.getvalue()
335 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000336 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000338
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 # On Windows and Mac OSX this test comsumes large resources; It takes
341 # a long time to build the >2GB file and takes >2GB of disk space
342 # therefore the resource must be enabled to run this test.
343 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000344 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000345 print("\nTesting large file ops skipped on %s." % sys.platform,
346 file=sys.stderr)
347 print("It requires %d bytes and a long time." % self.LARGE,
348 file=sys.stderr)
349 print("Use 'regrtest.py -u largefile test_io' to run it.",
350 file=sys.stderr)
351 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000352 with self.open(support.TESTFN, "w+b", 0) as f:
353 self.large_file_ops(f)
354 with self.open(support.TESTFN, "w+b") as f:
355 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000356
357 def test_with_open(self):
358 for bufsize in (0, 1, 100):
359 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000360 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000361 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000362 self.assertEqual(f.closed, True)
363 f = None
364 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000365 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000366 1/0
367 except ZeroDivisionError:
368 self.assertEqual(f.closed, True)
369 else:
370 self.fail("1/0 didn't raise an exception")
371
Antoine Pitrou08838b62009-01-21 00:55:13 +0000372 # issue 5008
373 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000374 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000375 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000376 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000377 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000378 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000379 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000380 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000381 self.assert_(f.tell() > 0)
382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_destructor(self):
384 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000385 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def __del__(self):
387 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 try:
389 f = super().__del__
390 except AttributeError:
391 pass
392 else:
393 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000394 def close(self):
395 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000397 def flush(self):
398 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 super().flush()
400 f = MyFileIO(support.TESTFN, "wb")
401 f.write(b"xxx")
402 del f
403 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson45cec322009-04-24 23:14:50 +0000404 with open(support.TESTFN, "rb") as f:
405 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000406
407 def _check_base_destructor(self, base):
408 record = []
409 class MyIO(base):
410 def __init__(self):
411 # This exercises the availability of attributes on object
412 # destruction.
413 # (in the C version, close() is called by the tp_dealloc
414 # function, not by __del__)
415 self.on_del = 1
416 self.on_close = 2
417 self.on_flush = 3
418 def __del__(self):
419 record.append(self.on_del)
420 try:
421 f = super().__del__
422 except AttributeError:
423 pass
424 else:
425 f()
426 def close(self):
427 record.append(self.on_close)
428 super().close()
429 def flush(self):
430 record.append(self.on_flush)
431 super().flush()
432 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000433 del f
434 self.assertEqual(record, [1, 2, 3])
435
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000436 def test_IOBase_destructor(self):
437 self._check_base_destructor(self.IOBase)
438
439 def test_RawIOBase_destructor(self):
440 self._check_base_destructor(self.RawIOBase)
441
442 def test_BufferedIOBase_destructor(self):
443 self._check_base_destructor(self.BufferedIOBase)
444
445 def test_TextIOBase_destructor(self):
446 self._check_base_destructor(self.TextIOBase)
447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000449 with self.open(support.TESTFN, "wb") as f:
450 f.write(b"xxx")
451 with self.open(support.TESTFN, "rb") as f:
452 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000453
Guido van Rossumd4103952007-04-12 05:44:49 +0000454 def test_array_writes(self):
455 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000456 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000457 with self.open(support.TESTFN, "wb", 0) as f:
458 self.assertEqual(f.write(a), n)
459 with self.open(support.TESTFN, "wb") as f:
460 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000461
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000462 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000464 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000466 def test_read_closed(self):
467 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000468 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000469 with self.open(support.TESTFN, "r") as f:
470 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000471 self.assertEqual(file.read(), "egg\n")
472 file.seek(0)
473 file.close()
474 self.assertRaises(ValueError, file.read)
475
476 def test_no_closefd_with_filename(self):
477 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000479
480 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000482 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000484 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(file.buffer.raw.closefd, False)
487
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 def test_garbage_collection(self):
489 # FileIO objects are collected, and collecting them flushes
490 # all data to disk.
491 f = self.FileIO(support.TESTFN, "wb")
492 f.write(b"abcxxx")
493 f.f = f
494 wr = weakref.ref(f)
495 del f
496 gc.collect()
497 self.assert_(wr() is None, wr)
498 with open(support.TESTFN, "rb") as f:
499 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000500
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000501 def test_unbounded_file(self):
502 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
503 zero = "/dev/zero"
504 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000505 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000506 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000507 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000508 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000509 self.skipTest("test requires at least 2GB of memory")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000510 with open(zero, "rb", buffering=0) as f:
511 self.assertRaises(OverflowError, f.read)
512 with open(zero, "rb") as f:
513 self.assertRaises(OverflowError, f.read)
514 with open(zero, "r") as f:
515 self.assertRaises(OverflowError, f.read)
516
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000517class CIOTest(IOTest):
518 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520class PyIOTest(IOTest):
521 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000522
Guido van Rossuma9e20242007-03-08 00:43:48 +0000523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000524class CommonBufferedTests:
525 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
526
527 def test_fileno(self):
528 rawio = self.MockRawIO()
529 bufio = self.tp(rawio)
530
531 self.assertEquals(42, bufio.fileno())
532
533 def test_no_fileno(self):
534 # XXX will we always have fileno() function? If so, kill
535 # this test. Else, write it.
536 pass
537
538 def test_invalid_args(self):
539 rawio = self.MockRawIO()
540 bufio = self.tp(rawio)
541 # Invalid whence
542 self.assertRaises(ValueError, bufio.seek, 0, -1)
543 self.assertRaises(ValueError, bufio.seek, 0, 3)
544
545 def test_override_destructor(self):
546 tp = self.tp
547 record = []
548 class MyBufferedIO(tp):
549 def __del__(self):
550 record.append(1)
551 try:
552 f = super().__del__
553 except AttributeError:
554 pass
555 else:
556 f()
557 def close(self):
558 record.append(2)
559 super().close()
560 def flush(self):
561 record.append(3)
562 super().flush()
563 rawio = self.MockRawIO()
564 bufio = MyBufferedIO(rawio)
565 writable = bufio.writable()
566 del bufio
567 if writable:
568 self.assertEqual(record, [1, 2, 3])
569 else:
570 self.assertEqual(record, [1, 2])
571
572 def test_context_manager(self):
573 # Test usability as a context manager
574 rawio = self.MockRawIO()
575 bufio = self.tp(rawio)
576 def _with():
577 with bufio:
578 pass
579 _with()
580 # bufio should now be closed, and using it a second time should raise
581 # a ValueError.
582 self.assertRaises(ValueError, _with)
583
584 def test_error_through_destructor(self):
585 # Test that the exception state is not modified by a destructor,
586 # even if close() fails.
587 rawio = self.CloseFailureIO()
588 def f():
589 self.tp(rawio).xyzzy
590 with support.captured_output("stderr") as s:
591 self.assertRaises(AttributeError, f)
592 s = s.getvalue().strip()
593 if s:
594 # The destructor *may* have printed an unraisable error, check it
595 self.assertEqual(len(s.splitlines()), 1)
596 self.assert_(s.startswith("Exception IOError: "), s)
597 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000598
599
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000600class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
601 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000603 def test_constructor(self):
604 rawio = self.MockRawIO([b"abc"])
605 bufio = self.tp(rawio)
606 bufio.__init__(rawio)
607 bufio.__init__(rawio, buffer_size=1024)
608 bufio.__init__(rawio, buffer_size=16)
609 self.assertEquals(b"abc", bufio.read())
610 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
611 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
612 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
613 rawio = self.MockRawIO([b"abc"])
614 bufio.__init__(rawio)
615 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000617 def test_read(self):
618 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
619 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000620 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000621 # Invalid args
622 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624 def test_read1(self):
625 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
626 bufio = self.tp(rawio)
627 self.assertEquals(b"a", bufio.read(1))
628 self.assertEquals(b"b", bufio.read1(1))
629 self.assertEquals(rawio._reads, 1)
630 self.assertEquals(b"c", bufio.read1(100))
631 self.assertEquals(rawio._reads, 1)
632 self.assertEquals(b"d", bufio.read1(100))
633 self.assertEquals(rawio._reads, 2)
634 self.assertEquals(b"efg", bufio.read1(100))
635 self.assertEquals(rawio._reads, 3)
636 self.assertEquals(b"", bufio.read1(100))
637 # Invalid args
638 self.assertRaises(ValueError, bufio.read1, -1)
639
640 def test_readinto(self):
641 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
642 bufio = self.tp(rawio)
643 b = bytearray(2)
644 self.assertEquals(bufio.readinto(b), 2)
645 self.assertEquals(b, b"ab")
646 self.assertEquals(bufio.readinto(b), 2)
647 self.assertEquals(b, b"cd")
648 self.assertEquals(bufio.readinto(b), 2)
649 self.assertEquals(b, b"ef")
650 self.assertEquals(bufio.readinto(b), 1)
651 self.assertEquals(b, b"gf")
652 self.assertEquals(bufio.readinto(b), 0)
653 self.assertEquals(b, b"gf")
654
655 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000656 data = b"abcdefghi"
657 dlen = len(data)
658
659 tests = [
660 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
661 [ 100, [ 3, 3, 3], [ dlen ] ],
662 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
663 ]
664
665 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 rawio = self.MockFileIO(data)
667 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000668 pos = 0
669 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000670 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000671 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000672 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000673 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000676 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
678 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000679
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000680 self.assertEquals(b"abcd", bufio.read(6))
681 self.assertEquals(b"e", bufio.read(1))
682 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000683 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000684 self.assert_(None is bufio.read())
685 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 def test_read_past_eof(self):
688 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
689 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000690
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000691 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 def test_read_all(self):
694 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
695 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000696
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000697 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000700 try:
701 # Write out many bytes with exactly the same number of 0's,
702 # 1's... 255's. This will help us check that concurrent reading
703 # doesn't duplicate or forget contents.
704 N = 1000
705 l = list(range(256)) * N
706 random.shuffle(l)
707 s = bytes(bytearray(l))
708 with io.open(support.TESTFN, "wb") as f:
709 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000710 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
711 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000712 errors = []
713 results = []
714 def f():
715 try:
716 # Intra-buffer read then buffer-flushing read
717 for n in cycle([1, 19]):
718 s = bufio.read(n)
719 if not s:
720 break
721 # list.append() is atomic
722 results.append(s)
723 except Exception as e:
724 errors.append(e)
725 raise
726 threads = [threading.Thread(target=f) for x in range(20)]
727 for t in threads:
728 t.start()
729 time.sleep(0.02) # yield
730 for t in threads:
731 t.join()
732 self.assertFalse(errors,
733 "the following exceptions were caught: %r" % errors)
734 s = b''.join(results)
735 for i in range(256):
736 c = bytes(bytearray([i]))
737 self.assertEqual(s.count(c), N)
738 finally:
739 support.unlink(support.TESTFN)
740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 def test_misbehaved_io(self):
742 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
743 bufio = self.tp(rawio)
744 self.assertRaises(IOError, bufio.seek, 0)
745 self.assertRaises(IOError, bufio.tell)
746
747class CBufferedReaderTest(BufferedReaderTest):
748 tp = io.BufferedReader
749
750 def test_constructor(self):
751 BufferedReaderTest.test_constructor(self)
752 # The allocation can succeed on 32-bit builds, e.g. with more
753 # than 2GB RAM and a 64-bit kernel.
754 if sys.maxsize > 0x7FFFFFFF:
755 rawio = self.MockRawIO()
756 bufio = self.tp(rawio)
757 self.assertRaises((OverflowError, MemoryError, ValueError),
758 bufio.__init__, rawio, sys.maxsize)
759
760 def test_initialization(self):
761 rawio = self.MockRawIO([b"abc"])
762 bufio = self.tp(rawio)
763 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
764 self.assertRaises(ValueError, bufio.read)
765 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
766 self.assertRaises(ValueError, bufio.read)
767 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
768 self.assertRaises(ValueError, bufio.read)
769
770 def test_misbehaved_io_read(self):
771 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
772 bufio = self.tp(rawio)
773 # _pyio.BufferedReader seems to implement reading different, so that
774 # checking this is not so easy.
775 self.assertRaises(IOError, bufio.read, 10)
776
777 def test_garbage_collection(self):
778 # C BufferedReader objects are collected.
779 # The Python version has __del__, so it ends into gc.garbage instead
780 rawio = self.FileIO(support.TESTFN, "w+b")
781 f = self.tp(rawio)
782 f.f = f
783 wr = weakref.ref(f)
784 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000785 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 self.assert_(wr() is None, wr)
787
788class PyBufferedReaderTest(BufferedReaderTest):
789 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000790
Guido van Rossuma9e20242007-03-08 00:43:48 +0000791
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000792class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
793 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795 def test_constructor(self):
796 rawio = self.MockRawIO()
797 bufio = self.tp(rawio)
798 bufio.__init__(rawio)
799 bufio.__init__(rawio, buffer_size=1024)
800 bufio.__init__(rawio, buffer_size=16)
801 self.assertEquals(3, bufio.write(b"abc"))
802 bufio.flush()
803 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
804 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
805 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
806 bufio.__init__(rawio)
807 self.assertEquals(3, bufio.write(b"ghi"))
808 bufio.flush()
809 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
810
811 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000812 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000813 writer = self.MockRawIO()
814 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000815 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000816 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000818 def test_write_overflow(self):
819 writer = self.MockRawIO()
820 bufio = self.tp(writer, 8)
821 contents = b"abcdefghijklmnop"
822 for n in range(0, len(contents), 3):
823 bufio.write(contents[n:n+3])
824 flushed = b"".join(writer._write_stack)
825 # At least (total - 8) bytes were implicitly flushed, perhaps more
826 # depending on the implementation.
827 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 def check_writes(self, intermediate_func):
830 # Lots of writes, test the flushed output is as expected.
831 contents = bytes(range(256)) * 1000
832 n = 0
833 writer = self.MockRawIO()
834 bufio = self.tp(writer, 13)
835 # Generator of write sizes: repeat each N 15 times then proceed to N+1
836 def gen_sizes():
837 for size in count(1):
838 for i in range(15):
839 yield size
840 sizes = gen_sizes()
841 while n < len(contents):
842 size = min(next(sizes), len(contents) - n)
843 self.assertEquals(bufio.write(contents[n:n+size]), size)
844 intermediate_func(bufio)
845 n += size
846 bufio.flush()
847 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 def test_writes(self):
850 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 def test_writes_and_flushes(self):
853 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000854
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 def test_writes_and_seeks(self):
856 def _seekabs(bufio):
857 pos = bufio.tell()
858 bufio.seek(pos + 1, 0)
859 bufio.seek(pos - 1, 0)
860 bufio.seek(pos, 0)
861 self.check_writes(_seekabs)
862 def _seekrel(bufio):
863 pos = bufio.seek(0, 1)
864 bufio.seek(+1, 1)
865 bufio.seek(-1, 1)
866 bufio.seek(pos, 0)
867 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_writes_and_truncates(self):
870 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_write_non_blocking(self):
873 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000874 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 self.assertEquals(bufio.write(b"abcd"), 4)
877 self.assertEquals(bufio.write(b"efghi"), 5)
878 # 1 byte will be written, the rest will be buffered
879 raw.block_on(b"k")
880 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 # 8 bytes will be written, 8 will be buffered and the rest will be lost
883 raw.block_on(b"0")
884 try:
885 bufio.write(b"opqrwxyz0123456789")
886 except self.BlockingIOError as e:
887 written = e.characters_written
888 else:
889 self.fail("BlockingIOError should have been raised")
890 self.assertEquals(written, 16)
891 self.assertEquals(raw.pop_written(),
892 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000894 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
895 s = raw.pop_written()
896 # Previously buffered bytes were flushed
897 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 def test_write_and_rewind(self):
900 raw = io.BytesIO()
901 bufio = self.tp(raw, 4)
902 self.assertEqual(bufio.write(b"abcdef"), 6)
903 self.assertEqual(bufio.tell(), 6)
904 bufio.seek(0, 0)
905 self.assertEqual(bufio.write(b"XY"), 2)
906 bufio.seek(6, 0)
907 self.assertEqual(raw.getvalue(), b"XYcdef")
908 self.assertEqual(bufio.write(b"123456"), 6)
909 bufio.flush()
910 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912 def test_flush(self):
913 writer = self.MockRawIO()
914 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000915 bufio.write(b"abc")
916 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000917 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 def test_destructor(self):
920 writer = self.MockRawIO()
921 bufio = self.tp(writer, 8)
922 bufio.write(b"abc")
923 del bufio
924 self.assertEquals(b"abc", writer._write_stack[0])
925
926 def test_truncate(self):
927 # Truncate implicitly flushes the buffer.
928 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
929 bufio = self.tp(raw, 8)
930 bufio.write(b"abcdef")
931 self.assertEqual(bufio.truncate(3), 3)
932 self.assertEqual(bufio.tell(), 3)
933 with io.open(support.TESTFN, "rb", buffering=0) as f:
934 self.assertEqual(f.read(), b"abc")
935
936 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000937 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 # Write out many bytes from many threads and test they were
939 # all flushed.
940 N = 1000
941 contents = bytes(range(256)) * N
942 sizes = cycle([1, 19])
943 n = 0
944 queue = deque()
945 while n < len(contents):
946 size = next(sizes)
947 queue.append(contents[n:n+size])
948 n += size
949 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000950 # We use a real file object because it allows us to
951 # exercise situations where the GIL is released before
952 # writing the buffer to the raw streams. This is in addition
953 # to concurrency issues due to switching threads in the middle
954 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
956 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000957 errors = []
958 def f():
959 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 while True:
961 try:
962 s = queue.popleft()
963 except IndexError:
964 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000965 bufio.write(s)
966 except Exception as e:
967 errors.append(e)
968 raise
969 threads = [threading.Thread(target=f) for x in range(20)]
970 for t in threads:
971 t.start()
972 time.sleep(0.02) # yield
973 for t in threads:
974 t.join()
975 self.assertFalse(errors,
976 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000977 bufio.close()
978 with io.open(support.TESTFN, "rb") as f:
979 s = f.read()
980 for i in range(256):
981 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000982 finally:
983 support.unlink(support.TESTFN)
984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 def test_misbehaved_io(self):
986 rawio = self.MisbehavedRawIO()
987 bufio = self.tp(rawio, 5)
988 self.assertRaises(IOError, bufio.seek, 0)
989 self.assertRaises(IOError, bufio.tell)
990 self.assertRaises(IOError, bufio.write, b"abcdef")
991
Benjamin Peterson59406a92009-03-26 17:10:29 +0000992 def test_max_buffer_size_deprecation(self):
993 with support.check_warnings() as w:
994 warnings.simplefilter("always", DeprecationWarning)
995 self.tp(self.MockRawIO(), 8, 12)
996 self.assertEqual(len(w.warnings), 1)
997 warning = w.warnings[0]
998 self.assertTrue(warning.category is DeprecationWarning)
999 self.assertEqual(str(warning.message),
1000 "max_buffer_size is deprecated")
1001
1002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003class CBufferedWriterTest(BufferedWriterTest):
1004 tp = io.BufferedWriter
1005
1006 def test_constructor(self):
1007 BufferedWriterTest.test_constructor(self)
1008 # The allocation can succeed on 32-bit builds, e.g. with more
1009 # than 2GB RAM and a 64-bit kernel.
1010 if sys.maxsize > 0x7FFFFFFF:
1011 rawio = self.MockRawIO()
1012 bufio = self.tp(rawio)
1013 self.assertRaises((OverflowError, MemoryError, ValueError),
1014 bufio.__init__, rawio, sys.maxsize)
1015
1016 def test_initialization(self):
1017 rawio = self.MockRawIO()
1018 bufio = self.tp(rawio)
1019 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1020 self.assertRaises(ValueError, bufio.write, b"def")
1021 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1022 self.assertRaises(ValueError, bufio.write, b"def")
1023 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1024 self.assertRaises(ValueError, bufio.write, b"def")
1025
1026 def test_garbage_collection(self):
1027 # C BufferedWriter objects are collected, and collecting them flushes
1028 # all data to disk.
1029 # The Python version has __del__, so it ends into gc.garbage instead
1030 rawio = self.FileIO(support.TESTFN, "w+b")
1031 f = self.tp(rawio)
1032 f.write(b"123xxx")
1033 f.x = f
1034 wr = weakref.ref(f)
1035 del f
1036 gc.collect()
1037 self.assert_(wr() is None, wr)
1038 with open(support.TESTFN, "rb") as f:
1039 self.assertEqual(f.read(), b"123xxx")
1040
1041
1042class PyBufferedWriterTest(BufferedWriterTest):
1043 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001044
Guido van Rossum01a27522007-03-07 01:00:12 +00001045class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001046
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001047 def test_constructor(self):
1048 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001049 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001050
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001051 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001052 with support.check_warnings() as w:
1053 warnings.simplefilter("always", DeprecationWarning)
1054 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1055 self.assertEqual(len(w.warnings), 1)
1056 warning = w.warnings[0]
1057 self.assertTrue(warning.category is DeprecationWarning)
1058 self.assertEqual(str(warning.message),
1059 "max_buffer_size is deprecated")
1060
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001061 def test_constructor_with_not_readable(self):
1062 class NotReadable(MockRawIO):
1063 def readable(self):
1064 return False
1065
1066 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1067
1068 def test_constructor_with_not_writeable(self):
1069 class NotWriteable(MockRawIO):
1070 def writable(self):
1071 return False
1072
1073 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1074
1075 def test_read(self):
1076 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1077
1078 self.assertEqual(pair.read(3), b"abc")
1079 self.assertEqual(pair.read(1), b"d")
1080 self.assertEqual(pair.read(), b"ef")
1081
1082 def test_read1(self):
1083 # .read1() is delegated to the underlying reader object, so this test
1084 # can be shallow.
1085 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1086
1087 self.assertEqual(pair.read1(3), b"abc")
1088
1089 def test_readinto(self):
1090 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1091
1092 data = bytearray(5)
1093 self.assertEqual(pair.readinto(data), 5)
1094 self.assertEqual(data, b"abcde")
1095
1096 def test_write(self):
1097 w = self.MockRawIO()
1098 pair = self.tp(self.MockRawIO(), w)
1099
1100 pair.write(b"abc")
1101 pair.flush()
1102 pair.write(b"def")
1103 pair.flush()
1104 self.assertEqual(w._write_stack, [b"abc", b"def"])
1105
1106 def test_peek(self):
1107 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1108
1109 self.assertTrue(pair.peek(3).startswith(b"abc"))
1110 self.assertEqual(pair.read(3), b"abc")
1111
1112 def test_readable(self):
1113 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1114 self.assertTrue(pair.readable())
1115
1116 def test_writeable(self):
1117 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1118 self.assertTrue(pair.writable())
1119
1120 def test_seekable(self):
1121 # BufferedRWPairs are never seekable, even if their readers and writers
1122 # are.
1123 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1124 self.assertFalse(pair.seekable())
1125
1126 # .flush() is delegated to the underlying writer object and has been
1127 # tested in the test_write method.
1128
1129 def test_close_and_closed(self):
1130 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1131 self.assertFalse(pair.closed)
1132 pair.close()
1133 self.assertTrue(pair.closed)
1134
1135 def test_isatty(self):
1136 class SelectableIsAtty(MockRawIO):
1137 def __init__(self, isatty):
1138 MockRawIO.__init__(self)
1139 self._isatty = isatty
1140
1141 def isatty(self):
1142 return self._isatty
1143
1144 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1145 self.assertFalse(pair.isatty())
1146
1147 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1148 self.assertTrue(pair.isatty())
1149
1150 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1151 self.assertTrue(pair.isatty())
1152
1153 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1154 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156class CBufferedRWPairTest(BufferedRWPairTest):
1157 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159class PyBufferedRWPairTest(BufferedRWPairTest):
1160 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162
1163class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1164 read_mode = "rb+"
1165 write_mode = "wb+"
1166
1167 def test_constructor(self):
1168 BufferedReaderTest.test_constructor(self)
1169 BufferedWriterTest.test_constructor(self)
1170
1171 def test_read_and_write(self):
1172 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001173 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001174
1175 self.assertEqual(b"as", rw.read(2))
1176 rw.write(b"ddd")
1177 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001178 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001180 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 def test_seek_and_tell(self):
1183 raw = self.BytesIO(b"asdfghjkl")
1184 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001185
1186 self.assertEquals(b"as", rw.read(2))
1187 self.assertEquals(2, rw.tell())
1188 rw.seek(0, 0)
1189 self.assertEquals(b"asdf", rw.read(4))
1190
1191 rw.write(b"asdf")
1192 rw.seek(0, 0)
1193 self.assertEquals(b"asdfasdfl", rw.read())
1194 self.assertEquals(9, rw.tell())
1195 rw.seek(-4, 2)
1196 self.assertEquals(5, rw.tell())
1197 rw.seek(2, 1)
1198 self.assertEquals(7, rw.tell())
1199 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001200 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001201
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 def check_flush_and_read(self, read_func):
1203 raw = self.BytesIO(b"abcdefghi")
1204 bufio = self.tp(raw)
1205
1206 self.assertEquals(b"ab", read_func(bufio, 2))
1207 bufio.write(b"12")
1208 self.assertEquals(b"ef", read_func(bufio, 2))
1209 self.assertEquals(6, bufio.tell())
1210 bufio.flush()
1211 self.assertEquals(6, bufio.tell())
1212 self.assertEquals(b"ghi", read_func(bufio))
1213 raw.seek(0, 0)
1214 raw.write(b"XYZ")
1215 # flush() resets the read buffer
1216 bufio.flush()
1217 bufio.seek(0, 0)
1218 self.assertEquals(b"XYZ", read_func(bufio, 3))
1219
1220 def test_flush_and_read(self):
1221 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1222
1223 def test_flush_and_readinto(self):
1224 def _readinto(bufio, n=-1):
1225 b = bytearray(n if n >= 0 else 9999)
1226 n = bufio.readinto(b)
1227 return bytes(b[:n])
1228 self.check_flush_and_read(_readinto)
1229
1230 def test_flush_and_peek(self):
1231 def _peek(bufio, n=-1):
1232 # This relies on the fact that the buffer can contain the whole
1233 # raw stream, otherwise peek() can return less.
1234 b = bufio.peek(n)
1235 if n != -1:
1236 b = b[:n]
1237 bufio.seek(len(b), 1)
1238 return b
1239 self.check_flush_and_read(_peek)
1240
1241 def test_flush_and_write(self):
1242 raw = self.BytesIO(b"abcdefghi")
1243 bufio = self.tp(raw)
1244
1245 bufio.write(b"123")
1246 bufio.flush()
1247 bufio.write(b"45")
1248 bufio.flush()
1249 bufio.seek(0, 0)
1250 self.assertEquals(b"12345fghi", raw.getvalue())
1251 self.assertEquals(b"12345fghi", bufio.read())
1252
1253 def test_threads(self):
1254 BufferedReaderTest.test_threads(self)
1255 BufferedWriterTest.test_threads(self)
1256
1257 def test_writes_and_peek(self):
1258 def _peek(bufio):
1259 bufio.peek(1)
1260 self.check_writes(_peek)
1261 def _peek(bufio):
1262 pos = bufio.tell()
1263 bufio.seek(-1, 1)
1264 bufio.peek(1)
1265 bufio.seek(pos, 0)
1266 self.check_writes(_peek)
1267
1268 def test_writes_and_reads(self):
1269 def _read(bufio):
1270 bufio.seek(-1, 1)
1271 bufio.read(1)
1272 self.check_writes(_read)
1273
1274 def test_writes_and_read1s(self):
1275 def _read1(bufio):
1276 bufio.seek(-1, 1)
1277 bufio.read1(1)
1278 self.check_writes(_read1)
1279
1280 def test_writes_and_readintos(self):
1281 def _read(bufio):
1282 bufio.seek(-1, 1)
1283 bufio.readinto(bytearray(1))
1284 self.check_writes(_read)
1285
1286 def test_misbehaved_io(self):
1287 BufferedReaderTest.test_misbehaved_io(self)
1288 BufferedWriterTest.test_misbehaved_io(self)
1289
1290class CBufferedRandomTest(BufferedRandomTest):
1291 tp = io.BufferedRandom
1292
1293 def test_constructor(self):
1294 BufferedRandomTest.test_constructor(self)
1295 # The allocation can succeed on 32-bit builds, e.g. with more
1296 # than 2GB RAM and a 64-bit kernel.
1297 if sys.maxsize > 0x7FFFFFFF:
1298 rawio = self.MockRawIO()
1299 bufio = self.tp(rawio)
1300 self.assertRaises((OverflowError, MemoryError, ValueError),
1301 bufio.__init__, rawio, sys.maxsize)
1302
1303 def test_garbage_collection(self):
1304 CBufferedReaderTest.test_garbage_collection(self)
1305 CBufferedWriterTest.test_garbage_collection(self)
1306
1307class PyBufferedRandomTest(BufferedRandomTest):
1308 tp = pyio.BufferedRandom
1309
1310
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001311# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1312# properties:
1313# - A single output character can correspond to many bytes of input.
1314# - The number of input bytes to complete the character can be
1315# undetermined until the last input byte is received.
1316# - The number of input bytes can vary depending on previous input.
1317# - A single input byte can correspond to many characters of output.
1318# - The number of output characters can be undetermined until the
1319# last input byte is received.
1320# - The number of output characters can vary depending on previous input.
1321
1322class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1323 """
1324 For testing seek/tell behavior with a stateful, buffering decoder.
1325
1326 Input is a sequence of words. Words may be fixed-length (length set
1327 by input) or variable-length (period-terminated). In variable-length
1328 mode, extra periods are ignored. Possible words are:
1329 - 'i' followed by a number sets the input length, I (maximum 99).
1330 When I is set to 0, words are space-terminated.
1331 - 'o' followed by a number sets the output length, O (maximum 99).
1332 - Any other word is converted into a word followed by a period on
1333 the output. The output word consists of the input word truncated
1334 or padded out with hyphens to make its length equal to O. If O
1335 is 0, the word is output verbatim without truncating or padding.
1336 I and O are initially set to 1. When I changes, any buffered input is
1337 re-scanned according to the new I. EOF also terminates the last word.
1338 """
1339
1340 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001341 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001342 self.reset()
1343
1344 def __repr__(self):
1345 return '<SID %x>' % id(self)
1346
1347 def reset(self):
1348 self.i = 1
1349 self.o = 1
1350 self.buffer = bytearray()
1351
1352 def getstate(self):
1353 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1354 return bytes(self.buffer), i*100 + o
1355
1356 def setstate(self, state):
1357 buffer, io = state
1358 self.buffer = bytearray(buffer)
1359 i, o = divmod(io, 100)
1360 self.i, self.o = i ^ 1, o ^ 1
1361
1362 def decode(self, input, final=False):
1363 output = ''
1364 for b in input:
1365 if self.i == 0: # variable-length, terminated with period
1366 if b == ord('.'):
1367 if self.buffer:
1368 output += self.process_word()
1369 else:
1370 self.buffer.append(b)
1371 else: # fixed-length, terminate after self.i bytes
1372 self.buffer.append(b)
1373 if len(self.buffer) == self.i:
1374 output += self.process_word()
1375 if final and self.buffer: # EOF terminates the last word
1376 output += self.process_word()
1377 return output
1378
1379 def process_word(self):
1380 output = ''
1381 if self.buffer[0] == ord('i'):
1382 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1383 elif self.buffer[0] == ord('o'):
1384 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1385 else:
1386 output = self.buffer.decode('ascii')
1387 if len(output) < self.o:
1388 output += '-'*self.o # pad out with hyphens
1389 if self.o:
1390 output = output[:self.o] # truncate to output length
1391 output += '.'
1392 self.buffer = bytearray()
1393 return output
1394
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001395 codecEnabled = False
1396
1397 @classmethod
1398 def lookupTestDecoder(cls, name):
1399 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001400 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001401 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001402 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001403 incrementalencoder=None,
1404 streamreader=None, streamwriter=None,
1405 incrementaldecoder=cls)
1406
1407# Register the previous decoder for testing.
1408# Disabled by default, tests will enable it.
1409codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1410
1411
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001412class StatefulIncrementalDecoderTest(unittest.TestCase):
1413 """
1414 Make sure the StatefulIncrementalDecoder actually works.
1415 """
1416
1417 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001418 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001419 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001420 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001421 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001422 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001423 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001424 # I=0, O=6 (variable-length input, fixed-length output)
1425 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1426 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001427 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001428 # I=6, O=3 (fixed-length input > fixed-length output)
1429 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1430 # I=0, then 3; O=29, then 15 (with longer output)
1431 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1432 'a----------------------------.' +
1433 'b----------------------------.' +
1434 'cde--------------------------.' +
1435 'abcdefghijabcde.' +
1436 'a.b------------.' +
1437 '.c.------------.' +
1438 'd.e------------.' +
1439 'k--------------.' +
1440 'l--------------.' +
1441 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001442 ]
1443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001445 # Try a few one-shot test cases.
1446 for input, eof, output in self.test_cases:
1447 d = StatefulIncrementalDecoder()
1448 self.assertEquals(d.decode(input, eof), output)
1449
1450 # Also test an unfinished decode, followed by forcing EOF.
1451 d = StatefulIncrementalDecoder()
1452 self.assertEquals(d.decode(b'oiabcd'), '')
1453 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001454
1455class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001456
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001457 def setUp(self):
1458 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1459 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001460 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001461
Guido van Rossumd0712812007-04-11 16:32:43 +00001462 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001463 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001464
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465 def test_constructor(self):
1466 r = self.BytesIO(b"\xc3\xa9\n\n")
1467 b = self.BufferedReader(r, 1000)
1468 t = self.TextIOWrapper(b)
1469 t.__init__(b, encoding="latin1", newline="\r\n")
1470 self.assertEquals(t.encoding, "latin1")
1471 self.assertEquals(t.line_buffering, False)
1472 t.__init__(b, encoding="utf8", line_buffering=True)
1473 self.assertEquals(t.encoding, "utf8")
1474 self.assertEquals(t.line_buffering, True)
1475 self.assertEquals("\xe9\n", t.readline())
1476 self.assertRaises(TypeError, t.__init__, b, newline=42)
1477 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1478
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001479 def test_repr(self):
1480 raw = self.BytesIO("hello".encode("utf-8"))
1481 b = self.BufferedReader(raw)
1482 t = self.TextIOWrapper(b, encoding="utf-8")
1483 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1484
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001485 def test_line_buffering(self):
1486 r = self.BytesIO()
1487 b = self.BufferedWriter(r, 1000)
1488 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001489 t.write("X")
1490 self.assertEquals(r.getvalue(), b"") # No flush happened
1491 t.write("Y\nZ")
1492 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1493 t.write("A\rB")
1494 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1495
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001496 def test_encoding(self):
1497 # Check the encoding attribute is always set, and valid
1498 b = self.BytesIO()
1499 t = self.TextIOWrapper(b, encoding="utf8")
1500 self.assertEqual(t.encoding, "utf8")
1501 t = self.TextIOWrapper(b)
1502 self.assert_(t.encoding is not None)
1503 codecs.lookup(t.encoding)
1504
1505 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001506 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001507 b = self.BytesIO(b"abc\n\xff\n")
1508 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001509 self.assertRaises(UnicodeError, t.read)
1510 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511 b = self.BytesIO(b"abc\n\xff\n")
1512 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001513 self.assertRaises(UnicodeError, t.read)
1514 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 b = self.BytesIO(b"abc\n\xff\n")
1516 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001517 self.assertEquals(t.read(), "abc\n\n")
1518 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001519 b = self.BytesIO(b"abc\n\xff\n")
1520 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001521 self.assertEquals(t.read(), "abc\n\ufffd\n")
1522
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001523 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001524 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 b = self.BytesIO()
1526 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001527 self.assertRaises(UnicodeError, t.write, "\xff")
1528 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529 b = self.BytesIO()
1530 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001531 self.assertRaises(UnicodeError, t.write, "\xff")
1532 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 b = self.BytesIO()
1534 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001535 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001536 t.write("abc\xffdef\n")
1537 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001538 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001539 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 b = self.BytesIO()
1541 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001542 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001543 t.write("abc\xffdef\n")
1544 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001545 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001548 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1549
1550 tests = [
1551 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001552 [ '', input_lines ],
1553 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1554 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1555 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001556 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001557 encodings = (
1558 'utf-8', 'latin-1',
1559 'utf-16', 'utf-16-le', 'utf-16-be',
1560 'utf-32', 'utf-32-le', 'utf-32-be',
1561 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001562
Guido van Rossum8358db22007-08-18 21:39:55 +00001563 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001564 # character in TextIOWrapper._pending_line.
1565 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001566 # XXX: str.encode() should return bytes
1567 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001568 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001569 for bufsize in range(1, 10):
1570 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1572 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001573 encoding=encoding)
1574 if do_reads:
1575 got_lines = []
1576 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001577 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001578 if c2 == '':
1579 break
1580 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001581 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001582 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001583 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001584
1585 for got_line, exp_line in zip(got_lines, exp_lines):
1586 self.assertEquals(got_line, exp_line)
1587 self.assertEquals(len(got_lines), len(exp_lines))
1588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 def test_newlines_input(self):
1590 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001591 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1592 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001593 (None, normalized.decode("ascii").splitlines(True)),
1594 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1596 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1597 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001598 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 buf = self.BytesIO(testdata)
1600 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001601 self.assertEquals(txt.readlines(), expected)
1602 txt.seek(0)
1603 self.assertEquals(txt.read(), "".join(expected))
1604
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605 def test_newlines_output(self):
1606 testdict = {
1607 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1608 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1609 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1610 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1611 }
1612 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1613 for newline, expected in tests:
1614 buf = self.BytesIO()
1615 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1616 txt.write("AAA\nB")
1617 txt.write("BB\nCCC\n")
1618 txt.write("X\rY\r\nZ")
1619 txt.flush()
1620 self.assertEquals(buf.closed, False)
1621 self.assertEquals(buf.getvalue(), expected)
1622
1623 def test_destructor(self):
1624 l = []
1625 base = self.BytesIO
1626 class MyBytesIO(base):
1627 def close(self):
1628 l.append(self.getvalue())
1629 base.close(self)
1630 b = MyBytesIO()
1631 t = self.TextIOWrapper(b, encoding="ascii")
1632 t.write("abc")
1633 del t
1634 self.assertEquals([b"abc"], l)
1635
1636 def test_override_destructor(self):
1637 record = []
1638 class MyTextIO(self.TextIOWrapper):
1639 def __del__(self):
1640 record.append(1)
1641 try:
1642 f = super().__del__
1643 except AttributeError:
1644 pass
1645 else:
1646 f()
1647 def close(self):
1648 record.append(2)
1649 super().close()
1650 def flush(self):
1651 record.append(3)
1652 super().flush()
1653 b = self.BytesIO()
1654 t = MyTextIO(b, encoding="ascii")
1655 del t
1656 self.assertEqual(record, [1, 2, 3])
1657
1658 def test_error_through_destructor(self):
1659 # Test that the exception state is not modified by a destructor,
1660 # even if close() fails.
1661 rawio = self.CloseFailureIO()
1662 def f():
1663 self.TextIOWrapper(rawio).xyzzy
1664 with support.captured_output("stderr") as s:
1665 self.assertRaises(AttributeError, f)
1666 s = s.getvalue().strip()
1667 if s:
1668 # The destructor *may* have printed an unraisable error, check it
1669 self.assertEqual(len(s.splitlines()), 1)
1670 self.assert_(s.startswith("Exception IOError: "), s)
1671 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001672
Guido van Rossum9b76da62007-04-11 01:09:03 +00001673 # Systematic tests of the text I/O API
1674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001675 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001676 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1677 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001679 f._CHUNK_SIZE = chunksize
1680 self.assertEquals(f.write("abc"), 3)
1681 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001683 f._CHUNK_SIZE = chunksize
1684 self.assertEquals(f.tell(), 0)
1685 self.assertEquals(f.read(), "abc")
1686 cookie = f.tell()
1687 self.assertEquals(f.seek(0), 0)
1688 self.assertEquals(f.read(2), "ab")
1689 self.assertEquals(f.read(1), "c")
1690 self.assertEquals(f.read(1), "")
1691 self.assertEquals(f.read(), "")
1692 self.assertEquals(f.tell(), cookie)
1693 self.assertEquals(f.seek(0), 0)
1694 self.assertEquals(f.seek(0, 2), cookie)
1695 self.assertEquals(f.write("def"), 3)
1696 self.assertEquals(f.seek(cookie), cookie)
1697 self.assertEquals(f.read(), "def")
1698 if enc.startswith("utf"):
1699 self.multi_line_test(f, enc)
1700 f.close()
1701
1702 def multi_line_test(self, f, enc):
1703 f.seek(0)
1704 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001705 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001706 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001707 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001708 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001709 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001710 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001711 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001712 wlines.append((f.tell(), line))
1713 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001714 f.seek(0)
1715 rlines = []
1716 while True:
1717 pos = f.tell()
1718 line = f.readline()
1719 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001720 break
1721 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001722 self.assertEquals(rlines, wlines)
1723
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 def test_telling(self):
1725 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001726 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001727 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001728 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001729 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001730 p2 = f.tell()
1731 f.seek(0)
1732 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001733 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001734 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001735 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001736 self.assertEquals(f.tell(), p2)
1737 f.seek(0)
1738 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001739 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001740 self.assertRaises(IOError, f.tell)
1741 self.assertEquals(f.tell(), p2)
1742 f.close()
1743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 def test_seeking(self):
1745 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001746 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001747 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001748 prefix = bytes(u_prefix.encode("utf-8"))
1749 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001750 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001751 suffix = bytes(u_suffix.encode("utf-8"))
1752 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001754 f.write(line*2)
1755 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001756 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001757 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001758 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001759 self.assertEquals(f.tell(), prefix_size)
1760 self.assertEquals(f.readline(), u_suffix)
1761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001763 # Regression test for a specific bug
1764 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001766 f.write(data)
1767 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001768 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001769 f._CHUNK_SIZE # Just test that it exists
1770 f._CHUNK_SIZE = 2
1771 f.readline()
1772 f.tell()
1773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 def test_seek_and_tell(self):
1775 #Test seek/tell using the StatefulIncrementalDecoder.
1776 # Make test faster by doing smaller seeks
1777 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001778
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001779 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001780 """Tell/seek to various points within a data stream and ensure
1781 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001782 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001783 f.write(data)
1784 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001785 f = self.open(support.TESTFN, encoding='test_decoder')
1786 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001787 decoded = f.read()
1788 f.close()
1789
Neal Norwitze2b07052008-03-18 19:52:05 +00001790 for i in range(min_pos, len(decoded) + 1): # seek positions
1791 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001792 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001793 self.assertEquals(f.read(i), decoded[:i])
1794 cookie = f.tell()
1795 self.assertEquals(f.read(j), decoded[i:i + j])
1796 f.seek(cookie)
1797 self.assertEquals(f.read(), decoded[i:])
1798 f.close()
1799
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001800 # Enable the test decoder.
1801 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001802
1803 # Run the tests.
1804 try:
1805 # Try each test case.
1806 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001807 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001808
1809 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001810 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1811 offset = CHUNK_SIZE - len(input)//2
1812 prefix = b'.'*offset
1813 # Don't bother seeking into the prefix (takes too long).
1814 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001815 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001816
1817 # Ensure our test decoder won't interfere with subsequent tests.
1818 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001819 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001822 data = "1234567890"
1823 tests = ("utf-16",
1824 "utf-16-le",
1825 "utf-16-be",
1826 "utf-32",
1827 "utf-32-le",
1828 "utf-32-be")
1829 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 buf = self.BytesIO()
1831 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001832 # Check if the BOM is written only once (see issue1753).
1833 f.write(data)
1834 f.write(data)
1835 f.seek(0)
1836 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001837 f.seek(0)
1838 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001839 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1840
Benjamin Petersona1b49012009-03-31 23:11:32 +00001841 def test_unreadable(self):
1842 class UnReadable(self.BytesIO):
1843 def readable(self):
1844 return False
1845 txt = self.TextIOWrapper(UnReadable())
1846 self.assertRaises(IOError, txt.read)
1847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001848 def test_read_one_by_one(self):
1849 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001850 reads = ""
1851 while True:
1852 c = txt.read(1)
1853 if not c:
1854 break
1855 reads += c
1856 self.assertEquals(reads, "AA\nBB")
1857
1858 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001859 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001860 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001861 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001862 reads = ""
1863 while True:
1864 c = txt.read(128)
1865 if not c:
1866 break
1867 reads += c
1868 self.assertEquals(reads, "A"*127+"\nB")
1869
1870 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001872
1873 # read one char at a time
1874 reads = ""
1875 while True:
1876 c = txt.read(1)
1877 if not c:
1878 break
1879 reads += c
1880 self.assertEquals(reads, self.normalized)
1881
1882 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001884 txt._CHUNK_SIZE = 4
1885
1886 reads = ""
1887 while True:
1888 c = txt.read(4)
1889 if not c:
1890 break
1891 reads += c
1892 self.assertEquals(reads, self.normalized)
1893
1894 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001896 txt._CHUNK_SIZE = 4
1897
1898 reads = txt.read(4)
1899 reads += txt.read(4)
1900 reads += txt.readline()
1901 reads += txt.readline()
1902 reads += txt.readline()
1903 self.assertEquals(reads, self.normalized)
1904
1905 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001907 txt._CHUNK_SIZE = 4
1908
1909 reads = txt.read(4)
1910 reads += txt.read()
1911 self.assertEquals(reads, self.normalized)
1912
1913 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001915 txt._CHUNK_SIZE = 4
1916
1917 reads = txt.read(4)
1918 pos = txt.tell()
1919 txt.seek(0)
1920 txt.seek(pos)
1921 self.assertEquals(txt.read(4), "BBB\n")
1922
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001923 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 buffer = self.BytesIO(self.testdata)
1925 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001926
1927 self.assertEqual(buffer.seekable(), txt.seekable())
1928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929class CTextIOWrapperTest(TextIOWrapperTest):
1930
1931 def test_initialization(self):
1932 r = self.BytesIO(b"\xc3\xa9\n\n")
1933 b = self.BufferedReader(r, 1000)
1934 t = self.TextIOWrapper(b)
1935 self.assertRaises(TypeError, t.__init__, b, newline=42)
1936 self.assertRaises(ValueError, t.read)
1937 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1938 self.assertRaises(ValueError, t.read)
1939
1940 def test_garbage_collection(self):
1941 # C TextIOWrapper objects are collected, and collecting them flushes
1942 # all data to disk.
1943 # The Python version has __del__, so it ends in gc.garbage instead.
1944 rawio = io.FileIO(support.TESTFN, "wb")
1945 b = self.BufferedWriter(rawio)
1946 t = self.TextIOWrapper(b, encoding="ascii")
1947 t.write("456def")
1948 t.x = t
1949 wr = weakref.ref(t)
1950 del t
1951 gc.collect()
1952 self.assert_(wr() is None, wr)
1953 with open(support.TESTFN, "rb") as f:
1954 self.assertEqual(f.read(), b"456def")
1955
1956class PyTextIOWrapperTest(TextIOWrapperTest):
1957 pass
1958
1959
1960class IncrementalNewlineDecoderTest(unittest.TestCase):
1961
1962 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001963 # UTF-8 specific tests for a newline decoder
1964 def _check_decode(b, s, **kwargs):
1965 # We exercise getstate() / setstate() as well as decode()
1966 state = decoder.getstate()
1967 self.assertEquals(decoder.decode(b, **kwargs), s)
1968 decoder.setstate(state)
1969 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001970
Antoine Pitrou180a3362008-12-14 16:36:46 +00001971 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001972
Antoine Pitrou180a3362008-12-14 16:36:46 +00001973 _check_decode(b'\xe8', "")
1974 _check_decode(b'\xa2', "")
1975 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001976
Antoine Pitrou180a3362008-12-14 16:36:46 +00001977 _check_decode(b'\xe8', "")
1978 _check_decode(b'\xa2', "")
1979 _check_decode(b'\x88', "\u8888")
1980
1981 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001982 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1983
Antoine Pitrou180a3362008-12-14 16:36:46 +00001984 decoder.reset()
1985 _check_decode(b'\n', "\n")
1986 _check_decode(b'\r', "")
1987 _check_decode(b'', "\n", final=True)
1988 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001989
Antoine Pitrou180a3362008-12-14 16:36:46 +00001990 _check_decode(b'\r', "")
1991 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001992
Antoine Pitrou180a3362008-12-14 16:36:46 +00001993 _check_decode(b'\r\r\n', "\n\n")
1994 _check_decode(b'\r', "")
1995 _check_decode(b'\r', "\n")
1996 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001997
Antoine Pitrou180a3362008-12-14 16:36:46 +00001998 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1999 _check_decode(b'\xe8\xa2\x88', "\u8888")
2000 _check_decode(b'\n', "\n")
2001 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2002 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002005 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 if encoding is not None:
2007 encoder = codecs.getincrementalencoder(encoding)()
2008 def _decode_bytewise(s):
2009 # Decode one byte at a time
2010 for b in encoder.encode(s):
2011 result.append(decoder.decode(bytes([b])))
2012 else:
2013 encoder = None
2014 def _decode_bytewise(s):
2015 # Decode one char at a time
2016 for c in s:
2017 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002018 self.assertEquals(decoder.newlines, None)
2019 _decode_bytewise("abc\n\r")
2020 self.assertEquals(decoder.newlines, '\n')
2021 _decode_bytewise("\nabc")
2022 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2023 _decode_bytewise("abc\r")
2024 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2025 _decode_bytewise("abc")
2026 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2027 _decode_bytewise("abc\r")
2028 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2029 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002030 input = "abc"
2031 if encoder is not None:
2032 encoder.reset()
2033 input = encoder.encode(input)
2034 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002035 self.assertEquals(decoder.newlines, None)
2036
2037 def test_newline_decoder(self):
2038 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 # None meaning the IncrementalNewlineDecoder takes unicode input
2040 # rather than bytes input
2041 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002042 'utf-16', 'utf-16-le', 'utf-16-be',
2043 'utf-32', 'utf-32-le', 'utf-32-be',
2044 )
2045 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 decoder = enc and codecs.getincrementaldecoder(enc)()
2047 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2048 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002049 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2051 self.check_newline_decoding_utf8(decoder)
2052
Antoine Pitrou66913e22009-03-06 23:40:56 +00002053 def test_newline_bytes(self):
2054 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2055 def _check(dec):
2056 self.assertEquals(dec.newlines, None)
2057 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2058 self.assertEquals(dec.newlines, None)
2059 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2060 self.assertEquals(dec.newlines, None)
2061 dec = self.IncrementalNewlineDecoder(None, translate=False)
2062 _check(dec)
2063 dec = self.IncrementalNewlineDecoder(None, translate=True)
2064 _check(dec)
2065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002066class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2067 pass
2068
2069class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2070 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002071
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002072
Guido van Rossum01a27522007-03-07 01:00:12 +00002073# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002074
Guido van Rossum5abbf752007-08-27 17:39:33 +00002075class MiscIOTest(unittest.TestCase):
2076
Barry Warsaw40e82462008-11-20 20:14:50 +00002077 def tearDown(self):
2078 support.unlink(support.TESTFN)
2079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002080 def test___all__(self):
2081 for name in self.io.__all__:
2082 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002083 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002084 if name == "open":
2085 continue
2086 elif "error" in name.lower():
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002087 self.assertTrue(issubclass(obj, Exception), name)
2088 elif not name.startswith("SEEK_"):
2089 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002090
Barry Warsaw40e82462008-11-20 20:14:50 +00002091 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002092 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002093 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002094 f.close()
2095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002096 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002097 self.assertEquals(f.name, support.TESTFN)
2098 self.assertEquals(f.buffer.name, support.TESTFN)
2099 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2100 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002101 self.assertEquals(f.buffer.mode, "rb")
2102 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002103 f.close()
2104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002106 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002107 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2108 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002110 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002111 self.assertEquals(g.mode, "wb")
2112 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002113 self.assertEquals(g.name, f.fileno())
2114 self.assertEquals(g.raw.name, f.fileno())
2115 f.close()
2116 g.close()
2117
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002118 def test_io_after_close(self):
2119 for kwargs in [
2120 {"mode": "w"},
2121 {"mode": "wb"},
2122 {"mode": "w", "buffering": 1},
2123 {"mode": "w", "buffering": 2},
2124 {"mode": "wb", "buffering": 0},
2125 {"mode": "r"},
2126 {"mode": "rb"},
2127 {"mode": "r", "buffering": 1},
2128 {"mode": "r", "buffering": 2},
2129 {"mode": "rb", "buffering": 0},
2130 {"mode": "w+"},
2131 {"mode": "w+b"},
2132 {"mode": "w+", "buffering": 1},
2133 {"mode": "w+", "buffering": 2},
2134 {"mode": "w+b", "buffering": 0},
2135 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002137 f.close()
2138 self.assertRaises(ValueError, f.flush)
2139 self.assertRaises(ValueError, f.fileno)
2140 self.assertRaises(ValueError, f.isatty)
2141 self.assertRaises(ValueError, f.__iter__)
2142 if hasattr(f, "peek"):
2143 self.assertRaises(ValueError, f.peek, 1)
2144 self.assertRaises(ValueError, f.read)
2145 if hasattr(f, "read1"):
2146 self.assertRaises(ValueError, f.read1, 1024)
2147 if hasattr(f, "readinto"):
2148 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2149 self.assertRaises(ValueError, f.readline)
2150 self.assertRaises(ValueError, f.readlines)
2151 self.assertRaises(ValueError, f.seek, 0)
2152 self.assertRaises(ValueError, f.tell)
2153 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 self.assertRaises(ValueError, f.write,
2155 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002156 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 def test_blockingioerror(self):
2160 # Various BlockingIOError issues
2161 self.assertRaises(TypeError, self.BlockingIOError)
2162 self.assertRaises(TypeError, self.BlockingIOError, 1)
2163 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2164 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2165 b = self.BlockingIOError(1, "")
2166 self.assertEqual(b.characters_written, 0)
2167 class C(str):
2168 pass
2169 c = C("")
2170 b = self.BlockingIOError(1, c)
2171 c.b = b
2172 b.c = c
2173 wr = weakref.ref(c)
2174 del c, b
2175 gc.collect()
2176 self.assert_(wr() is None, wr)
2177
2178 def test_abcs(self):
2179 # Test the visible base classes are ABCs.
2180 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2181 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2182 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2183 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2184
2185 def _check_abc_inheritance(self, abcmodule):
2186 with self.open(support.TESTFN, "wb", buffering=0) as f:
2187 self.assertTrue(isinstance(f, abcmodule.IOBase))
2188 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2189 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2190 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2191 with self.open(support.TESTFN, "wb") as f:
2192 self.assertTrue(isinstance(f, abcmodule.IOBase))
2193 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2194 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2195 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2196 with self.open(support.TESTFN, "w") as f:
2197 self.assertTrue(isinstance(f, abcmodule.IOBase))
2198 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2199 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2200 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2201
2202 def test_abc_inheritance(self):
2203 # Test implementations inherit from their respective ABCs
2204 self._check_abc_inheritance(self)
2205
2206 def test_abc_inheritance_official(self):
2207 # Test implementations inherit from the official ABCs of the
2208 # baseline "io" module.
2209 self._check_abc_inheritance(io)
2210
2211class CMiscIOTest(MiscIOTest):
2212 io = io
2213
2214class PyMiscIOTest(MiscIOTest):
2215 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002216
Guido van Rossum28524c72007-02-27 05:47:44 +00002217def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002218 tests = (CIOTest, PyIOTest,
2219 CBufferedReaderTest, PyBufferedReaderTest,
2220 CBufferedWriterTest, PyBufferedWriterTest,
2221 CBufferedRWPairTest, PyBufferedRWPairTest,
2222 CBufferedRandomTest, PyBufferedRandomTest,
2223 StatefulIncrementalDecoderTest,
2224 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2225 CTextIOWrapperTest, PyTextIOWrapperTest,
2226 CMiscIOTest, PyMiscIOTest,)
2227
2228 # Put the namespaces of the IO module we are testing and some useful mock
2229 # classes in the __dict__ of each test.
2230 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2231 MockNonBlockWriterIO)
2232 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2233 c_io_ns = {name : getattr(io, name) for name in all_members}
2234 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2235 globs = globals()
2236 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2237 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2238 # Avoid turning open into a bound method.
2239 py_io_ns["open"] = pyio.OpenWrapper
2240 for test in tests:
2241 if test.__name__.startswith("C"):
2242 for name, obj in c_io_ns.items():
2243 setattr(test, name, obj)
2244 elif test.__name__.startswith("Py"):
2245 for name, obj in py_io_ns.items():
2246 setattr(test, name, obj)
2247
2248 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002249
2250if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 test_main()