blob: 9ae1d284326851318b7d61648249d56494283833 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000294 f = self.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000300 f = self.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000301 self.assertEqual(f.readable(), True)
302 self.assertEqual(f.writable(), False)
303 self.assertEqual(f.seekable(), True)
304 self.read_ops(f)
305 f.close()
306
Guido van Rossum87429772007-04-10 21:06:59 +0000307 def test_buffered_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000308 f = self.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.readable(), False)
310 self.assertEqual(f.writable(), True)
311 self.assertEqual(f.seekable(), True)
312 self.write_ops(f)
313 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000314 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.readable(), True)
316 self.assertEqual(f.writable(), False)
317 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000319 f.close()
320
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000321 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000322 f = io.open(support.TESTFN, "wb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000323 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000324 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000325 f = self.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000326 self.assertEqual(f.readline(), b"abc\n")
327 self.assertEqual(f.readline(10), b"def\n")
328 self.assertEqual(f.readline(2), b"xy")
329 self.assertEqual(f.readline(4), b"zzy\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000330 self.assertEqual(f.readline(), b"foo\x00bar\n")
331 self.assertEqual(f.readline(), b"another line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000332 f.close()
333
Guido van Rossum28524c72007-02-27 05:47:44 +0000334 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000335 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000336 self.write_ops(f)
337 data = f.getvalue()
338 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000339 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000340 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000341
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000343 # On Windows and Mac OSX this test comsumes large resources; It takes
344 # a long time to build the >2GB file and takes >2GB of disk space
345 # therefore the resource must be enabled to run this test.
346 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000347 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 print("\nTesting large file ops skipped on %s." % sys.platform,
349 file=sys.stderr)
350 print("It requires %d bytes and a long time." % self.LARGE,
351 file=sys.stderr)
352 print("Use 'regrtest.py -u largefile test_io' to run it.",
353 file=sys.stderr)
354 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000355 with self.open(support.TESTFN, "w+b", 0) as f:
356 self.large_file_ops(f)
357 with self.open(support.TESTFN, "w+b") as f:
358 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000359
360 def test_with_open(self):
361 for bufsize in (0, 1, 100):
362 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000363 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000364 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000365 self.assertEqual(f.closed, True)
366 f = None
367 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000369 1/0
370 except ZeroDivisionError:
371 self.assertEqual(f.closed, True)
372 else:
373 self.fail("1/0 didn't raise an exception")
374
Antoine Pitrou08838b62009-01-21 00:55:13 +0000375 # issue 5008
376 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000377 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000378 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000379 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000380 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000381 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000382 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000383 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000384 self.assert_(f.tell() > 0)
385
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def test_destructor(self):
387 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000389 def __del__(self):
390 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 try:
392 f = super().__del__
393 except AttributeError:
394 pass
395 else:
396 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000397 def close(self):
398 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000400 def flush(self):
401 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 super().flush()
403 f = MyFileIO(support.TESTFN, "wb")
404 f.write(b"xxx")
405 del f
406 self.assertEqual(record, [1, 2, 3])
407 f = open(support.TESTFN, "rb")
408 self.assertEqual(f.read(), b"xxx")
409
410 def _check_base_destructor(self, base):
411 record = []
412 class MyIO(base):
413 def __init__(self):
414 # This exercises the availability of attributes on object
415 # destruction.
416 # (in the C version, close() is called by the tp_dealloc
417 # function, not by __del__)
418 self.on_del = 1
419 self.on_close = 2
420 self.on_flush = 3
421 def __del__(self):
422 record.append(self.on_del)
423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
429 def close(self):
430 record.append(self.on_close)
431 super().close()
432 def flush(self):
433 record.append(self.on_flush)
434 super().flush()
435 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000436 del f
437 self.assertEqual(record, [1, 2, 3])
438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 def test_IOBase_destructor(self):
440 self._check_base_destructor(self.IOBase)
441
442 def test_RawIOBase_destructor(self):
443 self._check_base_destructor(self.RawIOBase)
444
445 def test_BufferedIOBase_destructor(self):
446 self._check_base_destructor(self.BufferedIOBase)
447
448 def test_TextIOBase_destructor(self):
449 self._check_base_destructor(self.TextIOBase)
450
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def test_close_flushes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 f = self.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000453 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000454 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000456 self.assertEqual(f.read(), b"xxx")
457 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000458
Guido van Rossumd4103952007-04-12 05:44:49 +0000459 def test_array_writes(self):
460 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000461 n = len(a.tostring())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 f = self.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000463 self.assertEqual(f.write(a), n)
464 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 f = self.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000466 self.assertEqual(f.write(a), n)
467 f.close()
468
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000469 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000471 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_read_closed(self):
474 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000475 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 with self.open(support.TESTFN, "r") as f:
477 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000478 self.assertEqual(file.read(), "egg\n")
479 file.seek(0)
480 file.close()
481 self.assertRaises(ValueError, file.read)
482
483 def test_no_closefd_with_filename(self):
484 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486
487 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000489 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000491 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000492 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000493 self.assertEqual(file.buffer.raw.closefd, False)
494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000495 def test_garbage_collection(self):
496 # FileIO objects are collected, and collecting them flushes
497 # all data to disk.
498 f = self.FileIO(support.TESTFN, "wb")
499 f.write(b"abcxxx")
500 f.f = f
501 wr = weakref.ref(f)
502 del f
503 gc.collect()
504 self.assert_(wr() is None, wr)
505 with open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000507
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000508 def test_unbounded_file(self):
509 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
510 zero = "/dev/zero"
511 if not os.path.exists(zero):
Benjamin Peterson20c3b552009-04-18 14:49:19 +0000512 self.skip("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000513 if sys.maxsize > 0x7FFFFFFF:
Benjamin Peterson20c3b552009-04-18 14:49:19 +0000514 self.skip("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000515 if support.real_max_memuse < support._2G:
Benjamin Peterson20c3b552009-04-18 14:49:19 +0000516 self.skip("test requires at least 2GB of memory")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000517 with open(zero, "rb", buffering=0) as f:
518 self.assertRaises(OverflowError, f.read)
519 with open(zero, "rb") as f:
520 self.assertRaises(OverflowError, f.read)
521 with open(zero, "r") as f:
522 self.assertRaises(OverflowError, f.read)
523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000524class CIOTest(IOTest):
525 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000527class PyIOTest(IOTest):
528 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000529
Guido van Rossuma9e20242007-03-08 00:43:48 +0000530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531class CommonBufferedTests:
532 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
533
534 def test_fileno(self):
535 rawio = self.MockRawIO()
536 bufio = self.tp(rawio)
537
538 self.assertEquals(42, bufio.fileno())
539
540 def test_no_fileno(self):
541 # XXX will we always have fileno() function? If so, kill
542 # this test. Else, write it.
543 pass
544
545 def test_invalid_args(self):
546 rawio = self.MockRawIO()
547 bufio = self.tp(rawio)
548 # Invalid whence
549 self.assertRaises(ValueError, bufio.seek, 0, -1)
550 self.assertRaises(ValueError, bufio.seek, 0, 3)
551
552 def test_override_destructor(self):
553 tp = self.tp
554 record = []
555 class MyBufferedIO(tp):
556 def __del__(self):
557 record.append(1)
558 try:
559 f = super().__del__
560 except AttributeError:
561 pass
562 else:
563 f()
564 def close(self):
565 record.append(2)
566 super().close()
567 def flush(self):
568 record.append(3)
569 super().flush()
570 rawio = self.MockRawIO()
571 bufio = MyBufferedIO(rawio)
572 writable = bufio.writable()
573 del bufio
574 if writable:
575 self.assertEqual(record, [1, 2, 3])
576 else:
577 self.assertEqual(record, [1, 2])
578
579 def test_context_manager(self):
580 # Test usability as a context manager
581 rawio = self.MockRawIO()
582 bufio = self.tp(rawio)
583 def _with():
584 with bufio:
585 pass
586 _with()
587 # bufio should now be closed, and using it a second time should raise
588 # a ValueError.
589 self.assertRaises(ValueError, _with)
590
591 def test_error_through_destructor(self):
592 # Test that the exception state is not modified by a destructor,
593 # even if close() fails.
594 rawio = self.CloseFailureIO()
595 def f():
596 self.tp(rawio).xyzzy
597 with support.captured_output("stderr") as s:
598 self.assertRaises(AttributeError, f)
599 s = s.getvalue().strip()
600 if s:
601 # The destructor *may* have printed an unraisable error, check it
602 self.assertEqual(len(s.splitlines()), 1)
603 self.assert_(s.startswith("Exception IOError: "), s)
604 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000605
606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
608 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610 def test_constructor(self):
611 rawio = self.MockRawIO([b"abc"])
612 bufio = self.tp(rawio)
613 bufio.__init__(rawio)
614 bufio.__init__(rawio, buffer_size=1024)
615 bufio.__init__(rawio, buffer_size=16)
616 self.assertEquals(b"abc", bufio.read())
617 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
618 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
619 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
620 rawio = self.MockRawIO([b"abc"])
621 bufio.__init__(rawio)
622 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624 def test_read(self):
625 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
626 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000627 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000628 # Invalid args
629 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 def test_read1(self):
632 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
633 bufio = self.tp(rawio)
634 self.assertEquals(b"a", bufio.read(1))
635 self.assertEquals(b"b", bufio.read1(1))
636 self.assertEquals(rawio._reads, 1)
637 self.assertEquals(b"c", bufio.read1(100))
638 self.assertEquals(rawio._reads, 1)
639 self.assertEquals(b"d", bufio.read1(100))
640 self.assertEquals(rawio._reads, 2)
641 self.assertEquals(b"efg", bufio.read1(100))
642 self.assertEquals(rawio._reads, 3)
643 self.assertEquals(b"", bufio.read1(100))
644 # Invalid args
645 self.assertRaises(ValueError, bufio.read1, -1)
646
647 def test_readinto(self):
648 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
649 bufio = self.tp(rawio)
650 b = bytearray(2)
651 self.assertEquals(bufio.readinto(b), 2)
652 self.assertEquals(b, b"ab")
653 self.assertEquals(bufio.readinto(b), 2)
654 self.assertEquals(b, b"cd")
655 self.assertEquals(bufio.readinto(b), 2)
656 self.assertEquals(b, b"ef")
657 self.assertEquals(bufio.readinto(b), 1)
658 self.assertEquals(b, b"gf")
659 self.assertEquals(bufio.readinto(b), 0)
660 self.assertEquals(b, b"gf")
661
662 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000663 data = b"abcdefghi"
664 dlen = len(data)
665
666 tests = [
667 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
668 [ 100, [ 3, 3, 3], [ dlen ] ],
669 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
670 ]
671
672 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000673 rawio = self.MockFileIO(data)
674 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000675 pos = 0
676 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000677 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000678 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000679 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000680 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000683 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
685 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000686
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000687 self.assertEquals(b"abcd", bufio.read(6))
688 self.assertEquals(b"e", bufio.read(1))
689 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000691 self.assert_(None is bufio.read())
692 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_read_past_eof(self):
695 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
696 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000697
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000698 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_read_all(self):
701 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
702 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000703
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000704 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000707 try:
708 # Write out many bytes with exactly the same number of 0's,
709 # 1's... 255's. This will help us check that concurrent reading
710 # doesn't duplicate or forget contents.
711 N = 1000
712 l = list(range(256)) * N
713 random.shuffle(l)
714 s = bytes(bytearray(l))
715 with io.open(support.TESTFN, "wb") as f:
716 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
718 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000719 errors = []
720 results = []
721 def f():
722 try:
723 # Intra-buffer read then buffer-flushing read
724 for n in cycle([1, 19]):
725 s = bufio.read(n)
726 if not s:
727 break
728 # list.append() is atomic
729 results.append(s)
730 except Exception as e:
731 errors.append(e)
732 raise
733 threads = [threading.Thread(target=f) for x in range(20)]
734 for t in threads:
735 t.start()
736 time.sleep(0.02) # yield
737 for t in threads:
738 t.join()
739 self.assertFalse(errors,
740 "the following exceptions were caught: %r" % errors)
741 s = b''.join(results)
742 for i in range(256):
743 c = bytes(bytearray([i]))
744 self.assertEqual(s.count(c), N)
745 finally:
746 support.unlink(support.TESTFN)
747
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 def test_misbehaved_io(self):
749 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
750 bufio = self.tp(rawio)
751 self.assertRaises(IOError, bufio.seek, 0)
752 self.assertRaises(IOError, bufio.tell)
753
754class CBufferedReaderTest(BufferedReaderTest):
755 tp = io.BufferedReader
756
757 def test_constructor(self):
758 BufferedReaderTest.test_constructor(self)
759 # The allocation can succeed on 32-bit builds, e.g. with more
760 # than 2GB RAM and a 64-bit kernel.
761 if sys.maxsize > 0x7FFFFFFF:
762 rawio = self.MockRawIO()
763 bufio = self.tp(rawio)
764 self.assertRaises((OverflowError, MemoryError, ValueError),
765 bufio.__init__, rawio, sys.maxsize)
766
767 def test_initialization(self):
768 rawio = self.MockRawIO([b"abc"])
769 bufio = self.tp(rawio)
770 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
771 self.assertRaises(ValueError, bufio.read)
772 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
773 self.assertRaises(ValueError, bufio.read)
774 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
775 self.assertRaises(ValueError, bufio.read)
776
777 def test_misbehaved_io_read(self):
778 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
779 bufio = self.tp(rawio)
780 # _pyio.BufferedReader seems to implement reading different, so that
781 # checking this is not so easy.
782 self.assertRaises(IOError, bufio.read, 10)
783
784 def test_garbage_collection(self):
785 # C BufferedReader objects are collected.
786 # The Python version has __del__, so it ends into gc.garbage instead
787 rawio = self.FileIO(support.TESTFN, "w+b")
788 f = self.tp(rawio)
789 f.f = f
790 wr = weakref.ref(f)
791 del f
792 gc.collect()
793 self.assert_(wr() is None, wr)
794
795class PyBufferedReaderTest(BufferedReaderTest):
796 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000797
Guido van Rossuma9e20242007-03-08 00:43:48 +0000798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000799class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
800 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000801
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000802 def test_constructor(self):
803 rawio = self.MockRawIO()
804 bufio = self.tp(rawio)
805 bufio.__init__(rawio)
806 bufio.__init__(rawio, buffer_size=1024)
807 bufio.__init__(rawio, buffer_size=16)
808 self.assertEquals(3, bufio.write(b"abc"))
809 bufio.flush()
810 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
811 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
813 bufio.__init__(rawio)
814 self.assertEquals(3, bufio.write(b"ghi"))
815 bufio.flush()
816 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
817
818 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000819 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820 writer = self.MockRawIO()
821 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000822 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000823 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000824
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000825 def test_write_overflow(self):
826 writer = self.MockRawIO()
827 bufio = self.tp(writer, 8)
828 contents = b"abcdefghijklmnop"
829 for n in range(0, len(contents), 3):
830 bufio.write(contents[n:n+3])
831 flushed = b"".join(writer._write_stack)
832 # At least (total - 8) bytes were implicitly flushed, perhaps more
833 # depending on the implementation.
834 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000835
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000836 def check_writes(self, intermediate_func):
837 # Lots of writes, test the flushed output is as expected.
838 contents = bytes(range(256)) * 1000
839 n = 0
840 writer = self.MockRawIO()
841 bufio = self.tp(writer, 13)
842 # Generator of write sizes: repeat each N 15 times then proceed to N+1
843 def gen_sizes():
844 for size in count(1):
845 for i in range(15):
846 yield size
847 sizes = gen_sizes()
848 while n < len(contents):
849 size = min(next(sizes), len(contents) - n)
850 self.assertEquals(bufio.write(contents[n:n+size]), size)
851 intermediate_func(bufio)
852 n += size
853 bufio.flush()
854 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000856 def test_writes(self):
857 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_writes_and_flushes(self):
860 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_writes_and_seeks(self):
863 def _seekabs(bufio):
864 pos = bufio.tell()
865 bufio.seek(pos + 1, 0)
866 bufio.seek(pos - 1, 0)
867 bufio.seek(pos, 0)
868 self.check_writes(_seekabs)
869 def _seekrel(bufio):
870 pos = bufio.seek(0, 1)
871 bufio.seek(+1, 1)
872 bufio.seek(-1, 1)
873 bufio.seek(pos, 0)
874 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_writes_and_truncates(self):
877 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 def test_write_non_blocking(self):
880 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000881 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 self.assertEquals(bufio.write(b"abcd"), 4)
884 self.assertEquals(bufio.write(b"efghi"), 5)
885 # 1 byte will be written, the rest will be buffered
886 raw.block_on(b"k")
887 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 # 8 bytes will be written, 8 will be buffered and the rest will be lost
890 raw.block_on(b"0")
891 try:
892 bufio.write(b"opqrwxyz0123456789")
893 except self.BlockingIOError as e:
894 written = e.characters_written
895 else:
896 self.fail("BlockingIOError should have been raised")
897 self.assertEquals(written, 16)
898 self.assertEquals(raw.pop_written(),
899 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
902 s = raw.pop_written()
903 # Previously buffered bytes were flushed
904 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 def test_write_and_rewind(self):
907 raw = io.BytesIO()
908 bufio = self.tp(raw, 4)
909 self.assertEqual(bufio.write(b"abcdef"), 6)
910 self.assertEqual(bufio.tell(), 6)
911 bufio.seek(0, 0)
912 self.assertEqual(bufio.write(b"XY"), 2)
913 bufio.seek(6, 0)
914 self.assertEqual(raw.getvalue(), b"XYcdef")
915 self.assertEqual(bufio.write(b"123456"), 6)
916 bufio.flush()
917 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 def test_flush(self):
920 writer = self.MockRawIO()
921 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000922 bufio.write(b"abc")
923 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000924 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926 def test_destructor(self):
927 writer = self.MockRawIO()
928 bufio = self.tp(writer, 8)
929 bufio.write(b"abc")
930 del bufio
931 self.assertEquals(b"abc", writer._write_stack[0])
932
933 def test_truncate(self):
934 # Truncate implicitly flushes the buffer.
935 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
936 bufio = self.tp(raw, 8)
937 bufio.write(b"abcdef")
938 self.assertEqual(bufio.truncate(3), 3)
939 self.assertEqual(bufio.tell(), 3)
940 with io.open(support.TESTFN, "rb", buffering=0) as f:
941 self.assertEqual(f.read(), b"abc")
942
943 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000944 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945 # Write out many bytes from many threads and test they were
946 # all flushed.
947 N = 1000
948 contents = bytes(range(256)) * N
949 sizes = cycle([1, 19])
950 n = 0
951 queue = deque()
952 while n < len(contents):
953 size = next(sizes)
954 queue.append(contents[n:n+size])
955 n += size
956 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000957 # We use a real file object because it allows us to
958 # exercise situations where the GIL is released before
959 # writing the buffer to the raw streams. This is in addition
960 # to concurrency issues due to switching threads in the middle
961 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
963 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000964 errors = []
965 def f():
966 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 while True:
968 try:
969 s = queue.popleft()
970 except IndexError:
971 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 bufio.write(s)
973 except Exception as e:
974 errors.append(e)
975 raise
976 threads = [threading.Thread(target=f) for x in range(20)]
977 for t in threads:
978 t.start()
979 time.sleep(0.02) # yield
980 for t in threads:
981 t.join()
982 self.assertFalse(errors,
983 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 bufio.close()
985 with io.open(support.TESTFN, "rb") as f:
986 s = f.read()
987 for i in range(256):
988 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000989 finally:
990 support.unlink(support.TESTFN)
991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 def test_misbehaved_io(self):
993 rawio = self.MisbehavedRawIO()
994 bufio = self.tp(rawio, 5)
995 self.assertRaises(IOError, bufio.seek, 0)
996 self.assertRaises(IOError, bufio.tell)
997 self.assertRaises(IOError, bufio.write, b"abcdef")
998
Benjamin Peterson59406a92009-03-26 17:10:29 +0000999 def test_max_buffer_size_deprecation(self):
1000 with support.check_warnings() as w:
1001 warnings.simplefilter("always", DeprecationWarning)
1002 self.tp(self.MockRawIO(), 8, 12)
1003 self.assertEqual(len(w.warnings), 1)
1004 warning = w.warnings[0]
1005 self.assertTrue(warning.category is DeprecationWarning)
1006 self.assertEqual(str(warning.message),
1007 "max_buffer_size is deprecated")
1008
1009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001010class CBufferedWriterTest(BufferedWriterTest):
1011 tp = io.BufferedWriter
1012
1013 def test_constructor(self):
1014 BufferedWriterTest.test_constructor(self)
1015 # The allocation can succeed on 32-bit builds, e.g. with more
1016 # than 2GB RAM and a 64-bit kernel.
1017 if sys.maxsize > 0x7FFFFFFF:
1018 rawio = self.MockRawIO()
1019 bufio = self.tp(rawio)
1020 self.assertRaises((OverflowError, MemoryError, ValueError),
1021 bufio.__init__, rawio, sys.maxsize)
1022
1023 def test_initialization(self):
1024 rawio = self.MockRawIO()
1025 bufio = self.tp(rawio)
1026 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1027 self.assertRaises(ValueError, bufio.write, b"def")
1028 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1029 self.assertRaises(ValueError, bufio.write, b"def")
1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1031 self.assertRaises(ValueError, bufio.write, b"def")
1032
1033 def test_garbage_collection(self):
1034 # C BufferedWriter objects are collected, and collecting them flushes
1035 # all data to disk.
1036 # The Python version has __del__, so it ends into gc.garbage instead
1037 rawio = self.FileIO(support.TESTFN, "w+b")
1038 f = self.tp(rawio)
1039 f.write(b"123xxx")
1040 f.x = f
1041 wr = weakref.ref(f)
1042 del f
1043 gc.collect()
1044 self.assert_(wr() is None, wr)
1045 with open(support.TESTFN, "rb") as f:
1046 self.assertEqual(f.read(), b"123xxx")
1047
1048
1049class PyBufferedWriterTest(BufferedWriterTest):
1050 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001051
Guido van Rossum01a27522007-03-07 01:00:12 +00001052class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001053
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001054 def test_constructor(self):
1055 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001056 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001057
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001058 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001059 with support.check_warnings() as w:
1060 warnings.simplefilter("always", DeprecationWarning)
1061 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1062 self.assertEqual(len(w.warnings), 1)
1063 warning = w.warnings[0]
1064 self.assertTrue(warning.category is DeprecationWarning)
1065 self.assertEqual(str(warning.message),
1066 "max_buffer_size is deprecated")
1067
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001068 def test_constructor_with_not_readable(self):
1069 class NotReadable(MockRawIO):
1070 def readable(self):
1071 return False
1072
1073 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1074
1075 def test_constructor_with_not_writeable(self):
1076 class NotWriteable(MockRawIO):
1077 def writable(self):
1078 return False
1079
1080 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1081
1082 def test_read(self):
1083 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1084
1085 self.assertEqual(pair.read(3), b"abc")
1086 self.assertEqual(pair.read(1), b"d")
1087 self.assertEqual(pair.read(), b"ef")
1088
1089 def test_read1(self):
1090 # .read1() is delegated to the underlying reader object, so this test
1091 # can be shallow.
1092 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1093
1094 self.assertEqual(pair.read1(3), b"abc")
1095
1096 def test_readinto(self):
1097 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1098
1099 data = bytearray(5)
1100 self.assertEqual(pair.readinto(data), 5)
1101 self.assertEqual(data, b"abcde")
1102
1103 def test_write(self):
1104 w = self.MockRawIO()
1105 pair = self.tp(self.MockRawIO(), w)
1106
1107 pair.write(b"abc")
1108 pair.flush()
1109 pair.write(b"def")
1110 pair.flush()
1111 self.assertEqual(w._write_stack, [b"abc", b"def"])
1112
1113 def test_peek(self):
1114 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1115
1116 self.assertTrue(pair.peek(3).startswith(b"abc"))
1117 self.assertEqual(pair.read(3), b"abc")
1118
1119 def test_readable(self):
1120 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1121 self.assertTrue(pair.readable())
1122
1123 def test_writeable(self):
1124 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1125 self.assertTrue(pair.writable())
1126
1127 def test_seekable(self):
1128 # BufferedRWPairs are never seekable, even if their readers and writers
1129 # are.
1130 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1131 self.assertFalse(pair.seekable())
1132
1133 # .flush() is delegated to the underlying writer object and has been
1134 # tested in the test_write method.
1135
1136 def test_close_and_closed(self):
1137 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1138 self.assertFalse(pair.closed)
1139 pair.close()
1140 self.assertTrue(pair.closed)
1141
1142 def test_isatty(self):
1143 class SelectableIsAtty(MockRawIO):
1144 def __init__(self, isatty):
1145 MockRawIO.__init__(self)
1146 self._isatty = isatty
1147
1148 def isatty(self):
1149 return self._isatty
1150
1151 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1152 self.assertFalse(pair.isatty())
1153
1154 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1155 self.assertTrue(pair.isatty())
1156
1157 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1158 self.assertTrue(pair.isatty())
1159
1160 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1161 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163class CBufferedRWPairTest(BufferedRWPairTest):
1164 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166class PyBufferedRWPairTest(BufferedRWPairTest):
1167 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169
1170class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1171 read_mode = "rb+"
1172 write_mode = "wb+"
1173
1174 def test_constructor(self):
1175 BufferedReaderTest.test_constructor(self)
1176 BufferedWriterTest.test_constructor(self)
1177
1178 def test_read_and_write(self):
1179 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001180 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001181
1182 self.assertEqual(b"as", rw.read(2))
1183 rw.write(b"ddd")
1184 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001185 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001187 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 def test_seek_and_tell(self):
1190 raw = self.BytesIO(b"asdfghjkl")
1191 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001192
1193 self.assertEquals(b"as", rw.read(2))
1194 self.assertEquals(2, rw.tell())
1195 rw.seek(0, 0)
1196 self.assertEquals(b"asdf", rw.read(4))
1197
1198 rw.write(b"asdf")
1199 rw.seek(0, 0)
1200 self.assertEquals(b"asdfasdfl", rw.read())
1201 self.assertEquals(9, rw.tell())
1202 rw.seek(-4, 2)
1203 self.assertEquals(5, rw.tell())
1204 rw.seek(2, 1)
1205 self.assertEquals(7, rw.tell())
1206 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001207 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 def check_flush_and_read(self, read_func):
1210 raw = self.BytesIO(b"abcdefghi")
1211 bufio = self.tp(raw)
1212
1213 self.assertEquals(b"ab", read_func(bufio, 2))
1214 bufio.write(b"12")
1215 self.assertEquals(b"ef", read_func(bufio, 2))
1216 self.assertEquals(6, bufio.tell())
1217 bufio.flush()
1218 self.assertEquals(6, bufio.tell())
1219 self.assertEquals(b"ghi", read_func(bufio))
1220 raw.seek(0, 0)
1221 raw.write(b"XYZ")
1222 # flush() resets the read buffer
1223 bufio.flush()
1224 bufio.seek(0, 0)
1225 self.assertEquals(b"XYZ", read_func(bufio, 3))
1226
1227 def test_flush_and_read(self):
1228 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1229
1230 def test_flush_and_readinto(self):
1231 def _readinto(bufio, n=-1):
1232 b = bytearray(n if n >= 0 else 9999)
1233 n = bufio.readinto(b)
1234 return bytes(b[:n])
1235 self.check_flush_and_read(_readinto)
1236
1237 def test_flush_and_peek(self):
1238 def _peek(bufio, n=-1):
1239 # This relies on the fact that the buffer can contain the whole
1240 # raw stream, otherwise peek() can return less.
1241 b = bufio.peek(n)
1242 if n != -1:
1243 b = b[:n]
1244 bufio.seek(len(b), 1)
1245 return b
1246 self.check_flush_and_read(_peek)
1247
1248 def test_flush_and_write(self):
1249 raw = self.BytesIO(b"abcdefghi")
1250 bufio = self.tp(raw)
1251
1252 bufio.write(b"123")
1253 bufio.flush()
1254 bufio.write(b"45")
1255 bufio.flush()
1256 bufio.seek(0, 0)
1257 self.assertEquals(b"12345fghi", raw.getvalue())
1258 self.assertEquals(b"12345fghi", bufio.read())
1259
1260 def test_threads(self):
1261 BufferedReaderTest.test_threads(self)
1262 BufferedWriterTest.test_threads(self)
1263
1264 def test_writes_and_peek(self):
1265 def _peek(bufio):
1266 bufio.peek(1)
1267 self.check_writes(_peek)
1268 def _peek(bufio):
1269 pos = bufio.tell()
1270 bufio.seek(-1, 1)
1271 bufio.peek(1)
1272 bufio.seek(pos, 0)
1273 self.check_writes(_peek)
1274
1275 def test_writes_and_reads(self):
1276 def _read(bufio):
1277 bufio.seek(-1, 1)
1278 bufio.read(1)
1279 self.check_writes(_read)
1280
1281 def test_writes_and_read1s(self):
1282 def _read1(bufio):
1283 bufio.seek(-1, 1)
1284 bufio.read1(1)
1285 self.check_writes(_read1)
1286
1287 def test_writes_and_readintos(self):
1288 def _read(bufio):
1289 bufio.seek(-1, 1)
1290 bufio.readinto(bytearray(1))
1291 self.check_writes(_read)
1292
1293 def test_misbehaved_io(self):
1294 BufferedReaderTest.test_misbehaved_io(self)
1295 BufferedWriterTest.test_misbehaved_io(self)
1296
1297class CBufferedRandomTest(BufferedRandomTest):
1298 tp = io.BufferedRandom
1299
1300 def test_constructor(self):
1301 BufferedRandomTest.test_constructor(self)
1302 # The allocation can succeed on 32-bit builds, e.g. with more
1303 # than 2GB RAM and a 64-bit kernel.
1304 if sys.maxsize > 0x7FFFFFFF:
1305 rawio = self.MockRawIO()
1306 bufio = self.tp(rawio)
1307 self.assertRaises((OverflowError, MemoryError, ValueError),
1308 bufio.__init__, rawio, sys.maxsize)
1309
1310 def test_garbage_collection(self):
1311 CBufferedReaderTest.test_garbage_collection(self)
1312 CBufferedWriterTest.test_garbage_collection(self)
1313
1314class PyBufferedRandomTest(BufferedRandomTest):
1315 tp = pyio.BufferedRandom
1316
1317
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001318# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1319# properties:
1320# - A single output character can correspond to many bytes of input.
1321# - The number of input bytes to complete the character can be
1322# undetermined until the last input byte is received.
1323# - The number of input bytes can vary depending on previous input.
1324# - A single input byte can correspond to many characters of output.
1325# - The number of output characters can be undetermined until the
1326# last input byte is received.
1327# - The number of output characters can vary depending on previous input.
1328
1329class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1330 """
1331 For testing seek/tell behavior with a stateful, buffering decoder.
1332
1333 Input is a sequence of words. Words may be fixed-length (length set
1334 by input) or variable-length (period-terminated). In variable-length
1335 mode, extra periods are ignored. Possible words are:
1336 - 'i' followed by a number sets the input length, I (maximum 99).
1337 When I is set to 0, words are space-terminated.
1338 - 'o' followed by a number sets the output length, O (maximum 99).
1339 - Any other word is converted into a word followed by a period on
1340 the output. The output word consists of the input word truncated
1341 or padded out with hyphens to make its length equal to O. If O
1342 is 0, the word is output verbatim without truncating or padding.
1343 I and O are initially set to 1. When I changes, any buffered input is
1344 re-scanned according to the new I. EOF also terminates the last word.
1345 """
1346
1347 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001348 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001349 self.reset()
1350
1351 def __repr__(self):
1352 return '<SID %x>' % id(self)
1353
1354 def reset(self):
1355 self.i = 1
1356 self.o = 1
1357 self.buffer = bytearray()
1358
1359 def getstate(self):
1360 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1361 return bytes(self.buffer), i*100 + o
1362
1363 def setstate(self, state):
1364 buffer, io = state
1365 self.buffer = bytearray(buffer)
1366 i, o = divmod(io, 100)
1367 self.i, self.o = i ^ 1, o ^ 1
1368
1369 def decode(self, input, final=False):
1370 output = ''
1371 for b in input:
1372 if self.i == 0: # variable-length, terminated with period
1373 if b == ord('.'):
1374 if self.buffer:
1375 output += self.process_word()
1376 else:
1377 self.buffer.append(b)
1378 else: # fixed-length, terminate after self.i bytes
1379 self.buffer.append(b)
1380 if len(self.buffer) == self.i:
1381 output += self.process_word()
1382 if final and self.buffer: # EOF terminates the last word
1383 output += self.process_word()
1384 return output
1385
1386 def process_word(self):
1387 output = ''
1388 if self.buffer[0] == ord('i'):
1389 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1390 elif self.buffer[0] == ord('o'):
1391 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1392 else:
1393 output = self.buffer.decode('ascii')
1394 if len(output) < self.o:
1395 output += '-'*self.o # pad out with hyphens
1396 if self.o:
1397 output = output[:self.o] # truncate to output length
1398 output += '.'
1399 self.buffer = bytearray()
1400 return output
1401
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001402 codecEnabled = False
1403
1404 @classmethod
1405 def lookupTestDecoder(cls, name):
1406 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001407 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001408 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001409 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001410 incrementalencoder=None,
1411 streamreader=None, streamwriter=None,
1412 incrementaldecoder=cls)
1413
1414# Register the previous decoder for testing.
1415# Disabled by default, tests will enable it.
1416codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1417
1418
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001419class StatefulIncrementalDecoderTest(unittest.TestCase):
1420 """
1421 Make sure the StatefulIncrementalDecoder actually works.
1422 """
1423
1424 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001425 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001426 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001427 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001428 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001429 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001430 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001431 # I=0, O=6 (variable-length input, fixed-length output)
1432 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1433 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001434 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001435 # I=6, O=3 (fixed-length input > fixed-length output)
1436 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1437 # I=0, then 3; O=29, then 15 (with longer output)
1438 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1439 'a----------------------------.' +
1440 'b----------------------------.' +
1441 'cde--------------------------.' +
1442 'abcdefghijabcde.' +
1443 'a.b------------.' +
1444 '.c.------------.' +
1445 'd.e------------.' +
1446 'k--------------.' +
1447 'l--------------.' +
1448 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001449 ]
1450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001452 # Try a few one-shot test cases.
1453 for input, eof, output in self.test_cases:
1454 d = StatefulIncrementalDecoder()
1455 self.assertEquals(d.decode(input, eof), output)
1456
1457 # Also test an unfinished decode, followed by forcing EOF.
1458 d = StatefulIncrementalDecoder()
1459 self.assertEquals(d.decode(b'oiabcd'), '')
1460 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001461
1462class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001463
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001464 def setUp(self):
1465 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1466 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001467 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001468
Guido van Rossumd0712812007-04-11 16:32:43 +00001469 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001470 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 def test_constructor(self):
1473 r = self.BytesIO(b"\xc3\xa9\n\n")
1474 b = self.BufferedReader(r, 1000)
1475 t = self.TextIOWrapper(b)
1476 t.__init__(b, encoding="latin1", newline="\r\n")
1477 self.assertEquals(t.encoding, "latin1")
1478 self.assertEquals(t.line_buffering, False)
1479 t.__init__(b, encoding="utf8", line_buffering=True)
1480 self.assertEquals(t.encoding, "utf8")
1481 self.assertEquals(t.line_buffering, True)
1482 self.assertEquals("\xe9\n", t.readline())
1483 self.assertRaises(TypeError, t.__init__, b, newline=42)
1484 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1485
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001486 def test_repr(self):
1487 raw = self.BytesIO("hello".encode("utf-8"))
1488 b = self.BufferedReader(raw)
1489 t = self.TextIOWrapper(b, encoding="utf-8")
1490 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 def test_line_buffering(self):
1493 r = self.BytesIO()
1494 b = self.BufferedWriter(r, 1000)
1495 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001496 t.write("X")
1497 self.assertEquals(r.getvalue(), b"") # No flush happened
1498 t.write("Y\nZ")
1499 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1500 t.write("A\rB")
1501 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503 def test_encoding(self):
1504 # Check the encoding attribute is always set, and valid
1505 b = self.BytesIO()
1506 t = self.TextIOWrapper(b, encoding="utf8")
1507 self.assertEqual(t.encoding, "utf8")
1508 t = self.TextIOWrapper(b)
1509 self.assert_(t.encoding is not None)
1510 codecs.lookup(t.encoding)
1511
1512 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001513 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001514 b = self.BytesIO(b"abc\n\xff\n")
1515 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001516 self.assertRaises(UnicodeError, t.read)
1517 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001518 b = self.BytesIO(b"abc\n\xff\n")
1519 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001520 self.assertRaises(UnicodeError, t.read)
1521 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001522 b = self.BytesIO(b"abc\n\xff\n")
1523 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001524 self.assertEquals(t.read(), "abc\n\n")
1525 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001526 b = self.BytesIO(b"abc\n\xff\n")
1527 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001528 self.assertEquals(t.read(), "abc\n\ufffd\n")
1529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001531 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001532 b = self.BytesIO()
1533 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001534 self.assertRaises(UnicodeError, t.write, "\xff")
1535 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 b = self.BytesIO()
1537 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001538 self.assertRaises(UnicodeError, t.write, "\xff")
1539 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 b = self.BytesIO()
1541 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001542 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001543 t.write("abc\xffdef\n")
1544 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001545 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001546 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 b = self.BytesIO()
1548 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001549 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001550 t.write("abc\xffdef\n")
1551 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001552 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001555 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1556
1557 tests = [
1558 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001559 [ '', input_lines ],
1560 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1561 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1562 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001563 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001564 encodings = (
1565 'utf-8', 'latin-1',
1566 'utf-16', 'utf-16-le', 'utf-16-be',
1567 'utf-32', 'utf-32-le', 'utf-32-be',
1568 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001569
Guido van Rossum8358db22007-08-18 21:39:55 +00001570 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001571 # character in TextIOWrapper._pending_line.
1572 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001573 # XXX: str.encode() should return bytes
1574 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001575 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001576 for bufsize in range(1, 10):
1577 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1579 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001580 encoding=encoding)
1581 if do_reads:
1582 got_lines = []
1583 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001584 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001585 if c2 == '':
1586 break
1587 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001588 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001589 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001590 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001591
1592 for got_line, exp_line in zip(got_lines, exp_lines):
1593 self.assertEquals(got_line, exp_line)
1594 self.assertEquals(len(got_lines), len(exp_lines))
1595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 def test_newlines_input(self):
1597 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001598 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1599 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001600 (None, normalized.decode("ascii").splitlines(True)),
1601 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1603 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1604 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001605 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 buf = self.BytesIO(testdata)
1607 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001608 self.assertEquals(txt.readlines(), expected)
1609 txt.seek(0)
1610 self.assertEquals(txt.read(), "".join(expected))
1611
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001612 def test_newlines_output(self):
1613 testdict = {
1614 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1615 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1616 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1617 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1618 }
1619 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1620 for newline, expected in tests:
1621 buf = self.BytesIO()
1622 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1623 txt.write("AAA\nB")
1624 txt.write("BB\nCCC\n")
1625 txt.write("X\rY\r\nZ")
1626 txt.flush()
1627 self.assertEquals(buf.closed, False)
1628 self.assertEquals(buf.getvalue(), expected)
1629
1630 def test_destructor(self):
1631 l = []
1632 base = self.BytesIO
1633 class MyBytesIO(base):
1634 def close(self):
1635 l.append(self.getvalue())
1636 base.close(self)
1637 b = MyBytesIO()
1638 t = self.TextIOWrapper(b, encoding="ascii")
1639 t.write("abc")
1640 del t
1641 self.assertEquals([b"abc"], l)
1642
1643 def test_override_destructor(self):
1644 record = []
1645 class MyTextIO(self.TextIOWrapper):
1646 def __del__(self):
1647 record.append(1)
1648 try:
1649 f = super().__del__
1650 except AttributeError:
1651 pass
1652 else:
1653 f()
1654 def close(self):
1655 record.append(2)
1656 super().close()
1657 def flush(self):
1658 record.append(3)
1659 super().flush()
1660 b = self.BytesIO()
1661 t = MyTextIO(b, encoding="ascii")
1662 del t
1663 self.assertEqual(record, [1, 2, 3])
1664
1665 def test_error_through_destructor(self):
1666 # Test that the exception state is not modified by a destructor,
1667 # even if close() fails.
1668 rawio = self.CloseFailureIO()
1669 def f():
1670 self.TextIOWrapper(rawio).xyzzy
1671 with support.captured_output("stderr") as s:
1672 self.assertRaises(AttributeError, f)
1673 s = s.getvalue().strip()
1674 if s:
1675 # The destructor *may* have printed an unraisable error, check it
1676 self.assertEqual(len(s.splitlines()), 1)
1677 self.assert_(s.startswith("Exception IOError: "), s)
1678 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001679
Guido van Rossum9b76da62007-04-11 01:09:03 +00001680 # Systematic tests of the text I/O API
1681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001683 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1684 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001686 f._CHUNK_SIZE = chunksize
1687 self.assertEquals(f.write("abc"), 3)
1688 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001690 f._CHUNK_SIZE = chunksize
1691 self.assertEquals(f.tell(), 0)
1692 self.assertEquals(f.read(), "abc")
1693 cookie = f.tell()
1694 self.assertEquals(f.seek(0), 0)
1695 self.assertEquals(f.read(2), "ab")
1696 self.assertEquals(f.read(1), "c")
1697 self.assertEquals(f.read(1), "")
1698 self.assertEquals(f.read(), "")
1699 self.assertEquals(f.tell(), cookie)
1700 self.assertEquals(f.seek(0), 0)
1701 self.assertEquals(f.seek(0, 2), cookie)
1702 self.assertEquals(f.write("def"), 3)
1703 self.assertEquals(f.seek(cookie), cookie)
1704 self.assertEquals(f.read(), "def")
1705 if enc.startswith("utf"):
1706 self.multi_line_test(f, enc)
1707 f.close()
1708
1709 def multi_line_test(self, f, enc):
1710 f.seek(0)
1711 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001712 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001713 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001714 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001715 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001716 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001717 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001718 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001719 wlines.append((f.tell(), line))
1720 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001721 f.seek(0)
1722 rlines = []
1723 while True:
1724 pos = f.tell()
1725 line = f.readline()
1726 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001727 break
1728 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001729 self.assertEquals(rlines, wlines)
1730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 def test_telling(self):
1732 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001733 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001734 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001735 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001736 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001737 p2 = f.tell()
1738 f.seek(0)
1739 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001740 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001741 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001742 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001743 self.assertEquals(f.tell(), p2)
1744 f.seek(0)
1745 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001746 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001747 self.assertRaises(IOError, f.tell)
1748 self.assertEquals(f.tell(), p2)
1749 f.close()
1750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 def test_seeking(self):
1752 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001753 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001754 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001755 prefix = bytes(u_prefix.encode("utf-8"))
1756 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001757 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001758 suffix = bytes(u_suffix.encode("utf-8"))
1759 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001761 f.write(line*2)
1762 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001764 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001765 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001766 self.assertEquals(f.tell(), prefix_size)
1767 self.assertEquals(f.readline(), u_suffix)
1768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001770 # Regression test for a specific bug
1771 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001773 f.write(data)
1774 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001776 f._CHUNK_SIZE # Just test that it exists
1777 f._CHUNK_SIZE = 2
1778 f.readline()
1779 f.tell()
1780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781 def test_seek_and_tell(self):
1782 #Test seek/tell using the StatefulIncrementalDecoder.
1783 # Make test faster by doing smaller seeks
1784 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001785
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001786 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001787 """Tell/seek to various points within a data stream and ensure
1788 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001789 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001790 f.write(data)
1791 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001792 f = self.open(support.TESTFN, encoding='test_decoder')
1793 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001794 decoded = f.read()
1795 f.close()
1796
Neal Norwitze2b07052008-03-18 19:52:05 +00001797 for i in range(min_pos, len(decoded) + 1): # seek positions
1798 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001800 self.assertEquals(f.read(i), decoded[:i])
1801 cookie = f.tell()
1802 self.assertEquals(f.read(j), decoded[i:i + j])
1803 f.seek(cookie)
1804 self.assertEquals(f.read(), decoded[i:])
1805 f.close()
1806
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001807 # Enable the test decoder.
1808 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001809
1810 # Run the tests.
1811 try:
1812 # Try each test case.
1813 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001814 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001815
1816 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001817 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1818 offset = CHUNK_SIZE - len(input)//2
1819 prefix = b'.'*offset
1820 # Don't bother seeking into the prefix (takes too long).
1821 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001822 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001823
1824 # Ensure our test decoder won't interfere with subsequent tests.
1825 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001826 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001829 data = "1234567890"
1830 tests = ("utf-16",
1831 "utf-16-le",
1832 "utf-16-be",
1833 "utf-32",
1834 "utf-32-le",
1835 "utf-32-be")
1836 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 buf = self.BytesIO()
1838 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001839 # Check if the BOM is written only once (see issue1753).
1840 f.write(data)
1841 f.write(data)
1842 f.seek(0)
1843 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001844 f.seek(0)
1845 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001846 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1847
Benjamin Petersona1b49012009-03-31 23:11:32 +00001848 def test_unreadable(self):
1849 class UnReadable(self.BytesIO):
1850 def readable(self):
1851 return False
1852 txt = self.TextIOWrapper(UnReadable())
1853 self.assertRaises(IOError, txt.read)
1854
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001855 def test_read_one_by_one(self):
1856 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001857 reads = ""
1858 while True:
1859 c = txt.read(1)
1860 if not c:
1861 break
1862 reads += c
1863 self.assertEquals(reads, "AA\nBB")
1864
1865 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001867 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001869 reads = ""
1870 while True:
1871 c = txt.read(128)
1872 if not c:
1873 break
1874 reads += c
1875 self.assertEquals(reads, "A"*127+"\nB")
1876
1877 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001879
1880 # read one char at a time
1881 reads = ""
1882 while True:
1883 c = txt.read(1)
1884 if not c:
1885 break
1886 reads += c
1887 self.assertEquals(reads, self.normalized)
1888
1889 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001891 txt._CHUNK_SIZE = 4
1892
1893 reads = ""
1894 while True:
1895 c = txt.read(4)
1896 if not c:
1897 break
1898 reads += c
1899 self.assertEquals(reads, self.normalized)
1900
1901 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001903 txt._CHUNK_SIZE = 4
1904
1905 reads = txt.read(4)
1906 reads += txt.read(4)
1907 reads += txt.readline()
1908 reads += txt.readline()
1909 reads += txt.readline()
1910 self.assertEquals(reads, self.normalized)
1911
1912 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001914 txt._CHUNK_SIZE = 4
1915
1916 reads = txt.read(4)
1917 reads += txt.read()
1918 self.assertEquals(reads, self.normalized)
1919
1920 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001922 txt._CHUNK_SIZE = 4
1923
1924 reads = txt.read(4)
1925 pos = txt.tell()
1926 txt.seek(0)
1927 txt.seek(pos)
1928 self.assertEquals(txt.read(4), "BBB\n")
1929
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001930 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001931 buffer = self.BytesIO(self.testdata)
1932 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001933
1934 self.assertEqual(buffer.seekable(), txt.seekable())
1935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936class CTextIOWrapperTest(TextIOWrapperTest):
1937
1938 def test_initialization(self):
1939 r = self.BytesIO(b"\xc3\xa9\n\n")
1940 b = self.BufferedReader(r, 1000)
1941 t = self.TextIOWrapper(b)
1942 self.assertRaises(TypeError, t.__init__, b, newline=42)
1943 self.assertRaises(ValueError, t.read)
1944 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1945 self.assertRaises(ValueError, t.read)
1946
1947 def test_garbage_collection(self):
1948 # C TextIOWrapper objects are collected, and collecting them flushes
1949 # all data to disk.
1950 # The Python version has __del__, so it ends in gc.garbage instead.
1951 rawio = io.FileIO(support.TESTFN, "wb")
1952 b = self.BufferedWriter(rawio)
1953 t = self.TextIOWrapper(b, encoding="ascii")
1954 t.write("456def")
1955 t.x = t
1956 wr = weakref.ref(t)
1957 del t
1958 gc.collect()
1959 self.assert_(wr() is None, wr)
1960 with open(support.TESTFN, "rb") as f:
1961 self.assertEqual(f.read(), b"456def")
1962
1963class PyTextIOWrapperTest(TextIOWrapperTest):
1964 pass
1965
1966
1967class IncrementalNewlineDecoderTest(unittest.TestCase):
1968
1969 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001970 # UTF-8 specific tests for a newline decoder
1971 def _check_decode(b, s, **kwargs):
1972 # We exercise getstate() / setstate() as well as decode()
1973 state = decoder.getstate()
1974 self.assertEquals(decoder.decode(b, **kwargs), s)
1975 decoder.setstate(state)
1976 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001977
Antoine Pitrou180a3362008-12-14 16:36:46 +00001978 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001979
Antoine Pitrou180a3362008-12-14 16:36:46 +00001980 _check_decode(b'\xe8', "")
1981 _check_decode(b'\xa2', "")
1982 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001983
Antoine Pitrou180a3362008-12-14 16:36:46 +00001984 _check_decode(b'\xe8', "")
1985 _check_decode(b'\xa2', "")
1986 _check_decode(b'\x88', "\u8888")
1987
1988 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001989 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1990
Antoine Pitrou180a3362008-12-14 16:36:46 +00001991 decoder.reset()
1992 _check_decode(b'\n', "\n")
1993 _check_decode(b'\r', "")
1994 _check_decode(b'', "\n", final=True)
1995 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001996
Antoine Pitrou180a3362008-12-14 16:36:46 +00001997 _check_decode(b'\r', "")
1998 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001999
Antoine Pitrou180a3362008-12-14 16:36:46 +00002000 _check_decode(b'\r\r\n', "\n\n")
2001 _check_decode(b'\r', "")
2002 _check_decode(b'\r', "\n")
2003 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002004
Antoine Pitrou180a3362008-12-14 16:36:46 +00002005 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2006 _check_decode(b'\xe8\xa2\x88', "\u8888")
2007 _check_decode(b'\n', "\n")
2008 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2009 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002012 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 if encoding is not None:
2014 encoder = codecs.getincrementalencoder(encoding)()
2015 def _decode_bytewise(s):
2016 # Decode one byte at a time
2017 for b in encoder.encode(s):
2018 result.append(decoder.decode(bytes([b])))
2019 else:
2020 encoder = None
2021 def _decode_bytewise(s):
2022 # Decode one char at a time
2023 for c in s:
2024 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002025 self.assertEquals(decoder.newlines, None)
2026 _decode_bytewise("abc\n\r")
2027 self.assertEquals(decoder.newlines, '\n')
2028 _decode_bytewise("\nabc")
2029 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2030 _decode_bytewise("abc\r")
2031 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2032 _decode_bytewise("abc")
2033 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2034 _decode_bytewise("abc\r")
2035 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2036 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 input = "abc"
2038 if encoder is not None:
2039 encoder.reset()
2040 input = encoder.encode(input)
2041 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002042 self.assertEquals(decoder.newlines, None)
2043
2044 def test_newline_decoder(self):
2045 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 # None meaning the IncrementalNewlineDecoder takes unicode input
2047 # rather than bytes input
2048 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002049 'utf-16', 'utf-16-le', 'utf-16-be',
2050 'utf-32', 'utf-32-le', 'utf-32-be',
2051 )
2052 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 decoder = enc and codecs.getincrementaldecoder(enc)()
2054 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2055 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002056 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002057 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2058 self.check_newline_decoding_utf8(decoder)
2059
Antoine Pitrou66913e22009-03-06 23:40:56 +00002060 def test_newline_bytes(self):
2061 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2062 def _check(dec):
2063 self.assertEquals(dec.newlines, None)
2064 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2065 self.assertEquals(dec.newlines, None)
2066 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2067 self.assertEquals(dec.newlines, None)
2068 dec = self.IncrementalNewlineDecoder(None, translate=False)
2069 _check(dec)
2070 dec = self.IncrementalNewlineDecoder(None, translate=True)
2071 _check(dec)
2072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2074 pass
2075
2076class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2077 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002078
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002079
Guido van Rossum01a27522007-03-07 01:00:12 +00002080# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002081
Guido van Rossum5abbf752007-08-27 17:39:33 +00002082class MiscIOTest(unittest.TestCase):
2083
Barry Warsaw40e82462008-11-20 20:14:50 +00002084 def tearDown(self):
2085 support.unlink(support.TESTFN)
2086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 def test___all__(self):
2088 for name in self.io.__all__:
2089 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002090 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002091 if name == "open":
2092 continue
2093 elif "error" in name.lower():
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002094 self.assertTrue(issubclass(obj, Exception), name)
2095 elif not name.startswith("SEEK_"):
2096 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002097
Barry Warsaw40e82462008-11-20 20:14:50 +00002098 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002100 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002101 f.close()
2102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002104 self.assertEquals(f.name, support.TESTFN)
2105 self.assertEquals(f.buffer.name, support.TESTFN)
2106 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2107 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002108 self.assertEquals(f.buffer.mode, "rb")
2109 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002110 f.close()
2111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002113 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002114 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2115 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002118 self.assertEquals(g.mode, "wb")
2119 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002120 self.assertEquals(g.name, f.fileno())
2121 self.assertEquals(g.raw.name, f.fileno())
2122 f.close()
2123 g.close()
2124
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002125 def test_io_after_close(self):
2126 for kwargs in [
2127 {"mode": "w"},
2128 {"mode": "wb"},
2129 {"mode": "w", "buffering": 1},
2130 {"mode": "w", "buffering": 2},
2131 {"mode": "wb", "buffering": 0},
2132 {"mode": "r"},
2133 {"mode": "rb"},
2134 {"mode": "r", "buffering": 1},
2135 {"mode": "r", "buffering": 2},
2136 {"mode": "rb", "buffering": 0},
2137 {"mode": "w+"},
2138 {"mode": "w+b"},
2139 {"mode": "w+", "buffering": 1},
2140 {"mode": "w+", "buffering": 2},
2141 {"mode": "w+b", "buffering": 0},
2142 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002144 f.close()
2145 self.assertRaises(ValueError, f.flush)
2146 self.assertRaises(ValueError, f.fileno)
2147 self.assertRaises(ValueError, f.isatty)
2148 self.assertRaises(ValueError, f.__iter__)
2149 if hasattr(f, "peek"):
2150 self.assertRaises(ValueError, f.peek, 1)
2151 self.assertRaises(ValueError, f.read)
2152 if hasattr(f, "read1"):
2153 self.assertRaises(ValueError, f.read1, 1024)
2154 if hasattr(f, "readinto"):
2155 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2156 self.assertRaises(ValueError, f.readline)
2157 self.assertRaises(ValueError, f.readlines)
2158 self.assertRaises(ValueError, f.seek, 0)
2159 self.assertRaises(ValueError, f.tell)
2160 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161 self.assertRaises(ValueError, f.write,
2162 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002163 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002166 def test_blockingioerror(self):
2167 # Various BlockingIOError issues
2168 self.assertRaises(TypeError, self.BlockingIOError)
2169 self.assertRaises(TypeError, self.BlockingIOError, 1)
2170 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2171 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2172 b = self.BlockingIOError(1, "")
2173 self.assertEqual(b.characters_written, 0)
2174 class C(str):
2175 pass
2176 c = C("")
2177 b = self.BlockingIOError(1, c)
2178 c.b = b
2179 b.c = c
2180 wr = weakref.ref(c)
2181 del c, b
2182 gc.collect()
2183 self.assert_(wr() is None, wr)
2184
2185 def test_abcs(self):
2186 # Test the visible base classes are ABCs.
2187 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2188 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2189 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2190 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2191
2192 def _check_abc_inheritance(self, abcmodule):
2193 with self.open(support.TESTFN, "wb", buffering=0) as f:
2194 self.assertTrue(isinstance(f, abcmodule.IOBase))
2195 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2196 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2197 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2198 with self.open(support.TESTFN, "wb") as f:
2199 self.assertTrue(isinstance(f, abcmodule.IOBase))
2200 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2201 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2202 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2203 with self.open(support.TESTFN, "w") as f:
2204 self.assertTrue(isinstance(f, abcmodule.IOBase))
2205 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2206 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2207 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2208
2209 def test_abc_inheritance(self):
2210 # Test implementations inherit from their respective ABCs
2211 self._check_abc_inheritance(self)
2212
2213 def test_abc_inheritance_official(self):
2214 # Test implementations inherit from the official ABCs of the
2215 # baseline "io" module.
2216 self._check_abc_inheritance(io)
2217
2218class CMiscIOTest(MiscIOTest):
2219 io = io
2220
2221class PyMiscIOTest(MiscIOTest):
2222 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002223
Guido van Rossum28524c72007-02-27 05:47:44 +00002224def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 tests = (CIOTest, PyIOTest,
2226 CBufferedReaderTest, PyBufferedReaderTest,
2227 CBufferedWriterTest, PyBufferedWriterTest,
2228 CBufferedRWPairTest, PyBufferedRWPairTest,
2229 CBufferedRandomTest, PyBufferedRandomTest,
2230 StatefulIncrementalDecoderTest,
2231 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2232 CTextIOWrapperTest, PyTextIOWrapperTest,
2233 CMiscIOTest, PyMiscIOTest,)
2234
2235 # Put the namespaces of the IO module we are testing and some useful mock
2236 # classes in the __dict__ of each test.
2237 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2238 MockNonBlockWriterIO)
2239 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2240 c_io_ns = {name : getattr(io, name) for name in all_members}
2241 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2242 globs = globals()
2243 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2244 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2245 # Avoid turning open into a bound method.
2246 py_io_ns["open"] = pyio.OpenWrapper
2247 for test in tests:
2248 if test.__name__.startswith("C"):
2249 for name, obj in c_io_ns.items():
2250 setattr(test, name, obj)
2251 elif test.__name__.startswith("Py"):
2252 for name, obj in py_io_ns.items():
2253 setattr(test, name, obj)
2254
2255 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002256
2257if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002258 test_main()