blob: 34b6d0909e6d379b70d8e167868a4edeb0d53aca [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000294 with self.open(support.TESTFN, "wb", buffering=0) as f:
295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 with self.open(support.TESTFN, "rb", buffering=0) as f:
300 self.assertEqual(f.readable(), True)
301 self.assertEqual(f.writable(), False)
302 self.assertEqual(f.seekable(), True)
303 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000304
Guido van Rossum87429772007-04-10 21:06:59 +0000305 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000306 with self.open(support.TESTFN, "wb") as f:
307 self.assertEqual(f.readable(), False)
308 self.assertEqual(f.writable(), True)
309 self.assertEqual(f.seekable(), True)
310 self.write_ops(f)
311 with self.open(support.TESTFN, "rb") as f:
312 self.assertEqual(f.readable(), True)
313 self.assertEqual(f.writable(), False)
314 self.assertEqual(f.seekable(), True)
315 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000316
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000317 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000318 with self.open(support.TESTFN, "wb") as f:
319 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
320 with self.open(support.TESTFN, "rb") as f:
321 self.assertEqual(f.readline(), b"abc\n")
322 self.assertEqual(f.readline(10), b"def\n")
323 self.assertEqual(f.readline(2), b"xy")
324 self.assertEqual(f.readline(4), b"zzy\n")
325 self.assertEqual(f.readline(), b"foo\x00bar\n")
326 self.assertEqual(f.readline(), b"another line")
327 self.assertRaises(TypeError, f.readline, 5.3)
328 with self.open(support.TESTFN, "r") as f:
329 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000332 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.write_ops(f)
334 data = f.getvalue()
335 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000336 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000337 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000338
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 # On Windows and Mac OSX this test comsumes large resources; It takes
341 # a long time to build the >2GB file and takes >2GB of disk space
342 # therefore the resource must be enabled to run this test.
343 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000344 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000345 print("\nTesting large file ops skipped on %s." % sys.platform,
346 file=sys.stderr)
347 print("It requires %d bytes and a long time." % self.LARGE,
348 file=sys.stderr)
349 print("Use 'regrtest.py -u largefile test_io' to run it.",
350 file=sys.stderr)
351 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000352 with self.open(support.TESTFN, "w+b", 0) as f:
353 self.large_file_ops(f)
354 with self.open(support.TESTFN, "w+b") as f:
355 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000356
357 def test_with_open(self):
358 for bufsize in (0, 1, 100):
359 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000360 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000361 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000362 self.assertEqual(f.closed, True)
363 f = None
364 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000365 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000366 1/0
367 except ZeroDivisionError:
368 self.assertEqual(f.closed, True)
369 else:
370 self.fail("1/0 didn't raise an exception")
371
Antoine Pitrou08838b62009-01-21 00:55:13 +0000372 # issue 5008
373 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000374 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000375 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000376 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000377 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000378 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000379 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000380 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000381 self.assert_(f.tell() > 0)
382
Guido van Rossum87429772007-04-10 21:06:59 +0000383 def test_destructor(self):
384 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000385 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def __del__(self):
387 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 try:
389 f = super().__del__
390 except AttributeError:
391 pass
392 else:
393 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000394 def close(self):
395 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000396 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000397 def flush(self):
398 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 super().flush()
400 f = MyFileIO(support.TESTFN, "wb")
401 f.write(b"xxx")
402 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000403 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000404 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson45cec322009-04-24 23:14:50 +0000405 with open(support.TESTFN, "rb") as f:
406 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000407
408 def _check_base_destructor(self, base):
409 record = []
410 class MyIO(base):
411 def __init__(self):
412 # This exercises the availability of attributes on object
413 # destruction.
414 # (in the C version, close() is called by the tp_dealloc
415 # function, not by __del__)
416 self.on_del = 1
417 self.on_close = 2
418 self.on_flush = 3
419 def __del__(self):
420 record.append(self.on_del)
421 try:
422 f = super().__del__
423 except AttributeError:
424 pass
425 else:
426 f()
427 def close(self):
428 record.append(self.on_close)
429 super().close()
430 def flush(self):
431 record.append(self.on_flush)
432 super().flush()
433 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000434 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000435 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000436 self.assertEqual(record, [1, 2, 3])
437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000438 def test_IOBase_destructor(self):
439 self._check_base_destructor(self.IOBase)
440
441 def test_RawIOBase_destructor(self):
442 self._check_base_destructor(self.RawIOBase)
443
444 def test_BufferedIOBase_destructor(self):
445 self._check_base_destructor(self.BufferedIOBase)
446
447 def test_TextIOBase_destructor(self):
448 self._check_base_destructor(self.TextIOBase)
449
Guido van Rossum87429772007-04-10 21:06:59 +0000450 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000451 with self.open(support.TESTFN, "wb") as f:
452 f.write(b"xxx")
453 with self.open(support.TESTFN, "rb") as f:
454 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000455
Guido van Rossumd4103952007-04-12 05:44:49 +0000456 def test_array_writes(self):
457 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000458 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000459 with self.open(support.TESTFN, "wb", 0) as f:
460 self.assertEqual(f.write(a), n)
461 with self.open(support.TESTFN, "wb") as f:
462 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000463
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000464 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000466 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000467
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 def test_read_closed(self):
469 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000470 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 with self.open(support.TESTFN, "r") as f:
472 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000473 self.assertEqual(file.read(), "egg\n")
474 file.seek(0)
475 file.close()
476 self.assertRaises(ValueError, file.read)
477
478 def test_no_closefd_with_filename(self):
479 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000480 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000481
482 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000484 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000488 self.assertEqual(file.buffer.raw.closefd, False)
489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 def test_garbage_collection(self):
491 # FileIO objects are collected, and collecting them flushes
492 # all data to disk.
493 f = self.FileIO(support.TESTFN, "wb")
494 f.write(b"abcxxx")
495 f.f = f
496 wr = weakref.ref(f)
497 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000498 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000499 self.assert_(wr() is None, wr)
500 with open(support.TESTFN, "rb") as f:
501 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000502
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000503 def test_unbounded_file(self):
504 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
505 zero = "/dev/zero"
506 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000507 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000508 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000509 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000510 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000511 self.skipTest("test requires at least 2GB of memory")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000512 with open(zero, "rb", buffering=0) as f:
513 self.assertRaises(OverflowError, f.read)
514 with open(zero, "rb") as f:
515 self.assertRaises(OverflowError, f.read)
516 with open(zero, "r") as f:
517 self.assertRaises(OverflowError, f.read)
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519class CIOTest(IOTest):
520 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522class PyIOTest(IOTest):
523 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000524
Guido van Rossuma9e20242007-03-08 00:43:48 +0000525
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000526class CommonBufferedTests:
527 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
528
529 def test_fileno(self):
530 rawio = self.MockRawIO()
531 bufio = self.tp(rawio)
532
533 self.assertEquals(42, bufio.fileno())
534
535 def test_no_fileno(self):
536 # XXX will we always have fileno() function? If so, kill
537 # this test. Else, write it.
538 pass
539
540 def test_invalid_args(self):
541 rawio = self.MockRawIO()
542 bufio = self.tp(rawio)
543 # Invalid whence
544 self.assertRaises(ValueError, bufio.seek, 0, -1)
545 self.assertRaises(ValueError, bufio.seek, 0, 3)
546
547 def test_override_destructor(self):
548 tp = self.tp
549 record = []
550 class MyBufferedIO(tp):
551 def __del__(self):
552 record.append(1)
553 try:
554 f = super().__del__
555 except AttributeError:
556 pass
557 else:
558 f()
559 def close(self):
560 record.append(2)
561 super().close()
562 def flush(self):
563 record.append(3)
564 super().flush()
565 rawio = self.MockRawIO()
566 bufio = MyBufferedIO(rawio)
567 writable = bufio.writable()
568 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000569 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 if writable:
571 self.assertEqual(record, [1, 2, 3])
572 else:
573 self.assertEqual(record, [1, 2])
574
575 def test_context_manager(self):
576 # Test usability as a context manager
577 rawio = self.MockRawIO()
578 bufio = self.tp(rawio)
579 def _with():
580 with bufio:
581 pass
582 _with()
583 # bufio should now be closed, and using it a second time should raise
584 # a ValueError.
585 self.assertRaises(ValueError, _with)
586
587 def test_error_through_destructor(self):
588 # Test that the exception state is not modified by a destructor,
589 # even if close() fails.
590 rawio = self.CloseFailureIO()
591 def f():
592 self.tp(rawio).xyzzy
593 with support.captured_output("stderr") as s:
594 self.assertRaises(AttributeError, f)
595 s = s.getvalue().strip()
596 if s:
597 # The destructor *may* have printed an unraisable error, check it
598 self.assertEqual(len(s.splitlines()), 1)
599 self.assert_(s.startswith("Exception IOError: "), s)
600 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000601
602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000603class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
604 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000606 def test_constructor(self):
607 rawio = self.MockRawIO([b"abc"])
608 bufio = self.tp(rawio)
609 bufio.__init__(rawio)
610 bufio.__init__(rawio, buffer_size=1024)
611 bufio.__init__(rawio, buffer_size=16)
612 self.assertEquals(b"abc", bufio.read())
613 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
614 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
615 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
616 rawio = self.MockRawIO([b"abc"])
617 bufio.__init__(rawio)
618 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620 def test_read(self):
621 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
622 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000623 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624 # Invalid args
625 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000627 def test_read1(self):
628 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
629 bufio = self.tp(rawio)
630 self.assertEquals(b"a", bufio.read(1))
631 self.assertEquals(b"b", bufio.read1(1))
632 self.assertEquals(rawio._reads, 1)
633 self.assertEquals(b"c", bufio.read1(100))
634 self.assertEquals(rawio._reads, 1)
635 self.assertEquals(b"d", bufio.read1(100))
636 self.assertEquals(rawio._reads, 2)
637 self.assertEquals(b"efg", bufio.read1(100))
638 self.assertEquals(rawio._reads, 3)
639 self.assertEquals(b"", bufio.read1(100))
640 # Invalid args
641 self.assertRaises(ValueError, bufio.read1, -1)
642
643 def test_readinto(self):
644 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
645 bufio = self.tp(rawio)
646 b = bytearray(2)
647 self.assertEquals(bufio.readinto(b), 2)
648 self.assertEquals(b, b"ab")
649 self.assertEquals(bufio.readinto(b), 2)
650 self.assertEquals(b, b"cd")
651 self.assertEquals(bufio.readinto(b), 2)
652 self.assertEquals(b, b"ef")
653 self.assertEquals(bufio.readinto(b), 1)
654 self.assertEquals(b, b"gf")
655 self.assertEquals(bufio.readinto(b), 0)
656 self.assertEquals(b, b"gf")
657
658 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000659 data = b"abcdefghi"
660 dlen = len(data)
661
662 tests = [
663 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
664 [ 100, [ 3, 3, 3], [ dlen ] ],
665 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
666 ]
667
668 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000669 rawio = self.MockFileIO(data)
670 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000671 pos = 0
672 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000673 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000674 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000676 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000678 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000679 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
681 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000682
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000683 self.assertEquals(b"abcd", bufio.read(6))
684 self.assertEquals(b"e", bufio.read(1))
685 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000687 self.assert_(None is bufio.read())
688 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_read_past_eof(self):
691 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
692 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000693
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000694 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_read_all(self):
697 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
698 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000699
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000700 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000703 try:
704 # Write out many bytes with exactly the same number of 0's,
705 # 1's... 255's. This will help us check that concurrent reading
706 # doesn't duplicate or forget contents.
707 N = 1000
708 l = list(range(256)) * N
709 random.shuffle(l)
710 s = bytes(bytearray(l))
711 with io.open(support.TESTFN, "wb") as f:
712 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000713 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
714 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000715 errors = []
716 results = []
717 def f():
718 try:
719 # Intra-buffer read then buffer-flushing read
720 for n in cycle([1, 19]):
721 s = bufio.read(n)
722 if not s:
723 break
724 # list.append() is atomic
725 results.append(s)
726 except Exception as e:
727 errors.append(e)
728 raise
729 threads = [threading.Thread(target=f) for x in range(20)]
730 for t in threads:
731 t.start()
732 time.sleep(0.02) # yield
733 for t in threads:
734 t.join()
735 self.assertFalse(errors,
736 "the following exceptions were caught: %r" % errors)
737 s = b''.join(results)
738 for i in range(256):
739 c = bytes(bytearray([i]))
740 self.assertEqual(s.count(c), N)
741 finally:
742 support.unlink(support.TESTFN)
743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_misbehaved_io(self):
745 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
746 bufio = self.tp(rawio)
747 self.assertRaises(IOError, bufio.seek, 0)
748 self.assertRaises(IOError, bufio.tell)
749
750class CBufferedReaderTest(BufferedReaderTest):
751 tp = io.BufferedReader
752
753 def test_constructor(self):
754 BufferedReaderTest.test_constructor(self)
755 # The allocation can succeed on 32-bit builds, e.g. with more
756 # than 2GB RAM and a 64-bit kernel.
757 if sys.maxsize > 0x7FFFFFFF:
758 rawio = self.MockRawIO()
759 bufio = self.tp(rawio)
760 self.assertRaises((OverflowError, MemoryError, ValueError),
761 bufio.__init__, rawio, sys.maxsize)
762
763 def test_initialization(self):
764 rawio = self.MockRawIO([b"abc"])
765 bufio = self.tp(rawio)
766 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
767 self.assertRaises(ValueError, bufio.read)
768 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
769 self.assertRaises(ValueError, bufio.read)
770 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
771 self.assertRaises(ValueError, bufio.read)
772
773 def test_misbehaved_io_read(self):
774 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
775 bufio = self.tp(rawio)
776 # _pyio.BufferedReader seems to implement reading different, so that
777 # checking this is not so easy.
778 self.assertRaises(IOError, bufio.read, 10)
779
780 def test_garbage_collection(self):
781 # C BufferedReader objects are collected.
782 # The Python version has __del__, so it ends into gc.garbage instead
783 rawio = self.FileIO(support.TESTFN, "w+b")
784 f = self.tp(rawio)
785 f.f = f
786 wr = weakref.ref(f)
787 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000788 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 self.assert_(wr() is None, wr)
790
791class PyBufferedReaderTest(BufferedReaderTest):
792 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000793
Guido van Rossuma9e20242007-03-08 00:43:48 +0000794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
796 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000797
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000798 def test_constructor(self):
799 rawio = self.MockRawIO()
800 bufio = self.tp(rawio)
801 bufio.__init__(rawio)
802 bufio.__init__(rawio, buffer_size=1024)
803 bufio.__init__(rawio, buffer_size=16)
804 self.assertEquals(3, bufio.write(b"abc"))
805 bufio.flush()
806 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
807 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
808 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
809 bufio.__init__(rawio)
810 self.assertEquals(3, bufio.write(b"ghi"))
811 bufio.flush()
812 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
813
814 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000815 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000816 writer = self.MockRawIO()
817 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000818 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000819 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000820
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000821 def test_write_overflow(self):
822 writer = self.MockRawIO()
823 bufio = self.tp(writer, 8)
824 contents = b"abcdefghijklmnop"
825 for n in range(0, len(contents), 3):
826 bufio.write(contents[n:n+3])
827 flushed = b"".join(writer._write_stack)
828 # At least (total - 8) bytes were implicitly flushed, perhaps more
829 # depending on the implementation.
830 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000832 def check_writes(self, intermediate_func):
833 # Lots of writes, test the flushed output is as expected.
834 contents = bytes(range(256)) * 1000
835 n = 0
836 writer = self.MockRawIO()
837 bufio = self.tp(writer, 13)
838 # Generator of write sizes: repeat each N 15 times then proceed to N+1
839 def gen_sizes():
840 for size in count(1):
841 for i in range(15):
842 yield size
843 sizes = gen_sizes()
844 while n < len(contents):
845 size = min(next(sizes), len(contents) - n)
846 self.assertEquals(bufio.write(contents[n:n+size]), size)
847 intermediate_func(bufio)
848 n += size
849 bufio.flush()
850 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 def test_writes(self):
853 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000854
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 def test_writes_and_flushes(self):
856 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_writes_and_seeks(self):
859 def _seekabs(bufio):
860 pos = bufio.tell()
861 bufio.seek(pos + 1, 0)
862 bufio.seek(pos - 1, 0)
863 bufio.seek(pos, 0)
864 self.check_writes(_seekabs)
865 def _seekrel(bufio):
866 pos = bufio.seek(0, 1)
867 bufio.seek(+1, 1)
868 bufio.seek(-1, 1)
869 bufio.seek(pos, 0)
870 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_writes_and_truncates(self):
873 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_write_non_blocking(self):
876 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000877 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 self.assertEquals(bufio.write(b"abcd"), 4)
880 self.assertEquals(bufio.write(b"efghi"), 5)
881 # 1 byte will be written, the rest will be buffered
882 raw.block_on(b"k")
883 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 # 8 bytes will be written, 8 will be buffered and the rest will be lost
886 raw.block_on(b"0")
887 try:
888 bufio.write(b"opqrwxyz0123456789")
889 except self.BlockingIOError as e:
890 written = e.characters_written
891 else:
892 self.fail("BlockingIOError should have been raised")
893 self.assertEquals(written, 16)
894 self.assertEquals(raw.pop_written(),
895 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
898 s = raw.pop_written()
899 # Previously buffered bytes were flushed
900 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000902 def test_write_and_rewind(self):
903 raw = io.BytesIO()
904 bufio = self.tp(raw, 4)
905 self.assertEqual(bufio.write(b"abcdef"), 6)
906 self.assertEqual(bufio.tell(), 6)
907 bufio.seek(0, 0)
908 self.assertEqual(bufio.write(b"XY"), 2)
909 bufio.seek(6, 0)
910 self.assertEqual(raw.getvalue(), b"XYcdef")
911 self.assertEqual(bufio.write(b"123456"), 6)
912 bufio.flush()
913 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 def test_flush(self):
916 writer = self.MockRawIO()
917 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000918 bufio.write(b"abc")
919 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000920 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 def test_destructor(self):
923 writer = self.MockRawIO()
924 bufio = self.tp(writer, 8)
925 bufio.write(b"abc")
926 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000927 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 self.assertEquals(b"abc", writer._write_stack[0])
929
930 def test_truncate(self):
931 # Truncate implicitly flushes the buffer.
932 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
933 bufio = self.tp(raw, 8)
934 bufio.write(b"abcdef")
935 self.assertEqual(bufio.truncate(3), 3)
936 self.assertEqual(bufio.tell(), 3)
937 with io.open(support.TESTFN, "rb", buffering=0) as f:
938 self.assertEqual(f.read(), b"abc")
939
940 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000941 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 # Write out many bytes from many threads and test they were
943 # all flushed.
944 N = 1000
945 contents = bytes(range(256)) * N
946 sizes = cycle([1, 19])
947 n = 0
948 queue = deque()
949 while n < len(contents):
950 size = next(sizes)
951 queue.append(contents[n:n+size])
952 n += size
953 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000954 # We use a real file object because it allows us to
955 # exercise situations where the GIL is released before
956 # writing the buffer to the raw streams. This is in addition
957 # to concurrency issues due to switching threads in the middle
958 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
960 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000961 errors = []
962 def f():
963 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 while True:
965 try:
966 s = queue.popleft()
967 except IndexError:
968 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000969 bufio.write(s)
970 except Exception as e:
971 errors.append(e)
972 raise
973 threads = [threading.Thread(target=f) for x in range(20)]
974 for t in threads:
975 t.start()
976 time.sleep(0.02) # yield
977 for t in threads:
978 t.join()
979 self.assertFalse(errors,
980 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981 bufio.close()
982 with io.open(support.TESTFN, "rb") as f:
983 s = f.read()
984 for i in range(256):
985 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000986 finally:
987 support.unlink(support.TESTFN)
988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_misbehaved_io(self):
990 rawio = self.MisbehavedRawIO()
991 bufio = self.tp(rawio, 5)
992 self.assertRaises(IOError, bufio.seek, 0)
993 self.assertRaises(IOError, bufio.tell)
994 self.assertRaises(IOError, bufio.write, b"abcdef")
995
Benjamin Peterson59406a92009-03-26 17:10:29 +0000996 def test_max_buffer_size_deprecation(self):
997 with support.check_warnings() as w:
998 warnings.simplefilter("always", DeprecationWarning)
999 self.tp(self.MockRawIO(), 8, 12)
1000 self.assertEqual(len(w.warnings), 1)
1001 warning = w.warnings[0]
1002 self.assertTrue(warning.category is DeprecationWarning)
1003 self.assertEqual(str(warning.message),
1004 "max_buffer_size is deprecated")
1005
1006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007class CBufferedWriterTest(BufferedWriterTest):
1008 tp = io.BufferedWriter
1009
1010 def test_constructor(self):
1011 BufferedWriterTest.test_constructor(self)
1012 # The allocation can succeed on 32-bit builds, e.g. with more
1013 # than 2GB RAM and a 64-bit kernel.
1014 if sys.maxsize > 0x7FFFFFFF:
1015 rawio = self.MockRawIO()
1016 bufio = self.tp(rawio)
1017 self.assertRaises((OverflowError, MemoryError, ValueError),
1018 bufio.__init__, rawio, sys.maxsize)
1019
1020 def test_initialization(self):
1021 rawio = self.MockRawIO()
1022 bufio = self.tp(rawio)
1023 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1024 self.assertRaises(ValueError, bufio.write, b"def")
1025 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1026 self.assertRaises(ValueError, bufio.write, b"def")
1027 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1028 self.assertRaises(ValueError, bufio.write, b"def")
1029
1030 def test_garbage_collection(self):
1031 # C BufferedWriter objects are collected, and collecting them flushes
1032 # all data to disk.
1033 # The Python version has __del__, so it ends into gc.garbage instead
1034 rawio = self.FileIO(support.TESTFN, "w+b")
1035 f = self.tp(rawio)
1036 f.write(b"123xxx")
1037 f.x = f
1038 wr = weakref.ref(f)
1039 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001040 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041 self.assert_(wr() is None, wr)
1042 with open(support.TESTFN, "rb") as f:
1043 self.assertEqual(f.read(), b"123xxx")
1044
1045
1046class PyBufferedWriterTest(BufferedWriterTest):
1047 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001048
Guido van Rossum01a27522007-03-07 01:00:12 +00001049class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001050
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001051 def test_constructor(self):
1052 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001053 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001054
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001055 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001056 with support.check_warnings() as w:
1057 warnings.simplefilter("always", DeprecationWarning)
1058 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1059 self.assertEqual(len(w.warnings), 1)
1060 warning = w.warnings[0]
1061 self.assertTrue(warning.category is DeprecationWarning)
1062 self.assertEqual(str(warning.message),
1063 "max_buffer_size is deprecated")
1064
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001065 def test_constructor_with_not_readable(self):
1066 class NotReadable(MockRawIO):
1067 def readable(self):
1068 return False
1069
1070 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1071
1072 def test_constructor_with_not_writeable(self):
1073 class NotWriteable(MockRawIO):
1074 def writable(self):
1075 return False
1076
1077 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1078
1079 def test_read(self):
1080 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1081
1082 self.assertEqual(pair.read(3), b"abc")
1083 self.assertEqual(pair.read(1), b"d")
1084 self.assertEqual(pair.read(), b"ef")
1085
1086 def test_read1(self):
1087 # .read1() is delegated to the underlying reader object, so this test
1088 # can be shallow.
1089 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1090
1091 self.assertEqual(pair.read1(3), b"abc")
1092
1093 def test_readinto(self):
1094 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1095
1096 data = bytearray(5)
1097 self.assertEqual(pair.readinto(data), 5)
1098 self.assertEqual(data, b"abcde")
1099
1100 def test_write(self):
1101 w = self.MockRawIO()
1102 pair = self.tp(self.MockRawIO(), w)
1103
1104 pair.write(b"abc")
1105 pair.flush()
1106 pair.write(b"def")
1107 pair.flush()
1108 self.assertEqual(w._write_stack, [b"abc", b"def"])
1109
1110 def test_peek(self):
1111 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1112
1113 self.assertTrue(pair.peek(3).startswith(b"abc"))
1114 self.assertEqual(pair.read(3), b"abc")
1115
1116 def test_readable(self):
1117 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1118 self.assertTrue(pair.readable())
1119
1120 def test_writeable(self):
1121 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1122 self.assertTrue(pair.writable())
1123
1124 def test_seekable(self):
1125 # BufferedRWPairs are never seekable, even if their readers and writers
1126 # are.
1127 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1128 self.assertFalse(pair.seekable())
1129
1130 # .flush() is delegated to the underlying writer object and has been
1131 # tested in the test_write method.
1132
1133 def test_close_and_closed(self):
1134 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1135 self.assertFalse(pair.closed)
1136 pair.close()
1137 self.assertTrue(pair.closed)
1138
1139 def test_isatty(self):
1140 class SelectableIsAtty(MockRawIO):
1141 def __init__(self, isatty):
1142 MockRawIO.__init__(self)
1143 self._isatty = isatty
1144
1145 def isatty(self):
1146 return self._isatty
1147
1148 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1149 self.assertFalse(pair.isatty())
1150
1151 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1152 self.assertTrue(pair.isatty())
1153
1154 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1155 self.assertTrue(pair.isatty())
1156
1157 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1158 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160class CBufferedRWPairTest(BufferedRWPairTest):
1161 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163class PyBufferedRWPairTest(BufferedRWPairTest):
1164 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166
1167class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1168 read_mode = "rb+"
1169 write_mode = "wb+"
1170
1171 def test_constructor(self):
1172 BufferedReaderTest.test_constructor(self)
1173 BufferedWriterTest.test_constructor(self)
1174
1175 def test_read_and_write(self):
1176 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001177 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
1179 self.assertEqual(b"as", rw.read(2))
1180 rw.write(b"ddd")
1181 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001182 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001183 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001184 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186 def test_seek_and_tell(self):
1187 raw = self.BytesIO(b"asdfghjkl")
1188 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001189
1190 self.assertEquals(b"as", rw.read(2))
1191 self.assertEquals(2, rw.tell())
1192 rw.seek(0, 0)
1193 self.assertEquals(b"asdf", rw.read(4))
1194
1195 rw.write(b"asdf")
1196 rw.seek(0, 0)
1197 self.assertEquals(b"asdfasdfl", rw.read())
1198 self.assertEquals(9, rw.tell())
1199 rw.seek(-4, 2)
1200 self.assertEquals(5, rw.tell())
1201 rw.seek(2, 1)
1202 self.assertEquals(7, rw.tell())
1203 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001204 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 def check_flush_and_read(self, read_func):
1207 raw = self.BytesIO(b"abcdefghi")
1208 bufio = self.tp(raw)
1209
1210 self.assertEquals(b"ab", read_func(bufio, 2))
1211 bufio.write(b"12")
1212 self.assertEquals(b"ef", read_func(bufio, 2))
1213 self.assertEquals(6, bufio.tell())
1214 bufio.flush()
1215 self.assertEquals(6, bufio.tell())
1216 self.assertEquals(b"ghi", read_func(bufio))
1217 raw.seek(0, 0)
1218 raw.write(b"XYZ")
1219 # flush() resets the read buffer
1220 bufio.flush()
1221 bufio.seek(0, 0)
1222 self.assertEquals(b"XYZ", read_func(bufio, 3))
1223
1224 def test_flush_and_read(self):
1225 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1226
1227 def test_flush_and_readinto(self):
1228 def _readinto(bufio, n=-1):
1229 b = bytearray(n if n >= 0 else 9999)
1230 n = bufio.readinto(b)
1231 return bytes(b[:n])
1232 self.check_flush_and_read(_readinto)
1233
1234 def test_flush_and_peek(self):
1235 def _peek(bufio, n=-1):
1236 # This relies on the fact that the buffer can contain the whole
1237 # raw stream, otherwise peek() can return less.
1238 b = bufio.peek(n)
1239 if n != -1:
1240 b = b[:n]
1241 bufio.seek(len(b), 1)
1242 return b
1243 self.check_flush_and_read(_peek)
1244
1245 def test_flush_and_write(self):
1246 raw = self.BytesIO(b"abcdefghi")
1247 bufio = self.tp(raw)
1248
1249 bufio.write(b"123")
1250 bufio.flush()
1251 bufio.write(b"45")
1252 bufio.flush()
1253 bufio.seek(0, 0)
1254 self.assertEquals(b"12345fghi", raw.getvalue())
1255 self.assertEquals(b"12345fghi", bufio.read())
1256
1257 def test_threads(self):
1258 BufferedReaderTest.test_threads(self)
1259 BufferedWriterTest.test_threads(self)
1260
1261 def test_writes_and_peek(self):
1262 def _peek(bufio):
1263 bufio.peek(1)
1264 self.check_writes(_peek)
1265 def _peek(bufio):
1266 pos = bufio.tell()
1267 bufio.seek(-1, 1)
1268 bufio.peek(1)
1269 bufio.seek(pos, 0)
1270 self.check_writes(_peek)
1271
1272 def test_writes_and_reads(self):
1273 def _read(bufio):
1274 bufio.seek(-1, 1)
1275 bufio.read(1)
1276 self.check_writes(_read)
1277
1278 def test_writes_and_read1s(self):
1279 def _read1(bufio):
1280 bufio.seek(-1, 1)
1281 bufio.read1(1)
1282 self.check_writes(_read1)
1283
1284 def test_writes_and_readintos(self):
1285 def _read(bufio):
1286 bufio.seek(-1, 1)
1287 bufio.readinto(bytearray(1))
1288 self.check_writes(_read)
1289
1290 def test_misbehaved_io(self):
1291 BufferedReaderTest.test_misbehaved_io(self)
1292 BufferedWriterTest.test_misbehaved_io(self)
1293
1294class CBufferedRandomTest(BufferedRandomTest):
1295 tp = io.BufferedRandom
1296
1297 def test_constructor(self):
1298 BufferedRandomTest.test_constructor(self)
1299 # The allocation can succeed on 32-bit builds, e.g. with more
1300 # than 2GB RAM and a 64-bit kernel.
1301 if sys.maxsize > 0x7FFFFFFF:
1302 rawio = self.MockRawIO()
1303 bufio = self.tp(rawio)
1304 self.assertRaises((OverflowError, MemoryError, ValueError),
1305 bufio.__init__, rawio, sys.maxsize)
1306
1307 def test_garbage_collection(self):
1308 CBufferedReaderTest.test_garbage_collection(self)
1309 CBufferedWriterTest.test_garbage_collection(self)
1310
1311class PyBufferedRandomTest(BufferedRandomTest):
1312 tp = pyio.BufferedRandom
1313
1314
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001315# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1316# properties:
1317# - A single output character can correspond to many bytes of input.
1318# - The number of input bytes to complete the character can be
1319# undetermined until the last input byte is received.
1320# - The number of input bytes can vary depending on previous input.
1321# - A single input byte can correspond to many characters of output.
1322# - The number of output characters can be undetermined until the
1323# last input byte is received.
1324# - The number of output characters can vary depending on previous input.
1325
1326class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1327 """
1328 For testing seek/tell behavior with a stateful, buffering decoder.
1329
1330 Input is a sequence of words. Words may be fixed-length (length set
1331 by input) or variable-length (period-terminated). In variable-length
1332 mode, extra periods are ignored. Possible words are:
1333 - 'i' followed by a number sets the input length, I (maximum 99).
1334 When I is set to 0, words are space-terminated.
1335 - 'o' followed by a number sets the output length, O (maximum 99).
1336 - Any other word is converted into a word followed by a period on
1337 the output. The output word consists of the input word truncated
1338 or padded out with hyphens to make its length equal to O. If O
1339 is 0, the word is output verbatim without truncating or padding.
1340 I and O are initially set to 1. When I changes, any buffered input is
1341 re-scanned according to the new I. EOF also terminates the last word.
1342 """
1343
1344 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001345 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001346 self.reset()
1347
1348 def __repr__(self):
1349 return '<SID %x>' % id(self)
1350
1351 def reset(self):
1352 self.i = 1
1353 self.o = 1
1354 self.buffer = bytearray()
1355
1356 def getstate(self):
1357 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1358 return bytes(self.buffer), i*100 + o
1359
1360 def setstate(self, state):
1361 buffer, io = state
1362 self.buffer = bytearray(buffer)
1363 i, o = divmod(io, 100)
1364 self.i, self.o = i ^ 1, o ^ 1
1365
1366 def decode(self, input, final=False):
1367 output = ''
1368 for b in input:
1369 if self.i == 0: # variable-length, terminated with period
1370 if b == ord('.'):
1371 if self.buffer:
1372 output += self.process_word()
1373 else:
1374 self.buffer.append(b)
1375 else: # fixed-length, terminate after self.i bytes
1376 self.buffer.append(b)
1377 if len(self.buffer) == self.i:
1378 output += self.process_word()
1379 if final and self.buffer: # EOF terminates the last word
1380 output += self.process_word()
1381 return output
1382
1383 def process_word(self):
1384 output = ''
1385 if self.buffer[0] == ord('i'):
1386 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1387 elif self.buffer[0] == ord('o'):
1388 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1389 else:
1390 output = self.buffer.decode('ascii')
1391 if len(output) < self.o:
1392 output += '-'*self.o # pad out with hyphens
1393 if self.o:
1394 output = output[:self.o] # truncate to output length
1395 output += '.'
1396 self.buffer = bytearray()
1397 return output
1398
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001399 codecEnabled = False
1400
1401 @classmethod
1402 def lookupTestDecoder(cls, name):
1403 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001404 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001405 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001406 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001407 incrementalencoder=None,
1408 streamreader=None, streamwriter=None,
1409 incrementaldecoder=cls)
1410
1411# Register the previous decoder for testing.
1412# Disabled by default, tests will enable it.
1413codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1414
1415
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001416class StatefulIncrementalDecoderTest(unittest.TestCase):
1417 """
1418 Make sure the StatefulIncrementalDecoder actually works.
1419 """
1420
1421 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001422 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001423 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001424 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001425 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001426 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001427 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001428 # I=0, O=6 (variable-length input, fixed-length output)
1429 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1430 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001431 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001432 # I=6, O=3 (fixed-length input > fixed-length output)
1433 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1434 # I=0, then 3; O=29, then 15 (with longer output)
1435 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1436 'a----------------------------.' +
1437 'b----------------------------.' +
1438 'cde--------------------------.' +
1439 'abcdefghijabcde.' +
1440 'a.b------------.' +
1441 '.c.------------.' +
1442 'd.e------------.' +
1443 'k--------------.' +
1444 'l--------------.' +
1445 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001446 ]
1447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001449 # Try a few one-shot test cases.
1450 for input, eof, output in self.test_cases:
1451 d = StatefulIncrementalDecoder()
1452 self.assertEquals(d.decode(input, eof), output)
1453
1454 # Also test an unfinished decode, followed by forcing EOF.
1455 d = StatefulIncrementalDecoder()
1456 self.assertEquals(d.decode(b'oiabcd'), '')
1457 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001458
1459class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001460
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001461 def setUp(self):
1462 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1463 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001464 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001465
Guido van Rossumd0712812007-04-11 16:32:43 +00001466 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001467 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001468
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001469 def test_constructor(self):
1470 r = self.BytesIO(b"\xc3\xa9\n\n")
1471 b = self.BufferedReader(r, 1000)
1472 t = self.TextIOWrapper(b)
1473 t.__init__(b, encoding="latin1", newline="\r\n")
1474 self.assertEquals(t.encoding, "latin1")
1475 self.assertEquals(t.line_buffering, False)
1476 t.__init__(b, encoding="utf8", line_buffering=True)
1477 self.assertEquals(t.encoding, "utf8")
1478 self.assertEquals(t.line_buffering, True)
1479 self.assertEquals("\xe9\n", t.readline())
1480 self.assertRaises(TypeError, t.__init__, b, newline=42)
1481 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1482
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001483 def test_repr(self):
1484 raw = self.BytesIO("hello".encode("utf-8"))
1485 b = self.BufferedReader(raw)
1486 t = self.TextIOWrapper(b, encoding="utf-8")
1487 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 def test_line_buffering(self):
1490 r = self.BytesIO()
1491 b = self.BufferedWriter(r, 1000)
1492 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001493 t.write("X")
1494 self.assertEquals(r.getvalue(), b"") # No flush happened
1495 t.write("Y\nZ")
1496 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1497 t.write("A\rB")
1498 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500 def test_encoding(self):
1501 # Check the encoding attribute is always set, and valid
1502 b = self.BytesIO()
1503 t = self.TextIOWrapper(b, encoding="utf8")
1504 self.assertEqual(t.encoding, "utf8")
1505 t = self.TextIOWrapper(b)
1506 self.assert_(t.encoding is not None)
1507 codecs.lookup(t.encoding)
1508
1509 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001510 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511 b = self.BytesIO(b"abc\n\xff\n")
1512 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001513 self.assertRaises(UnicodeError, t.read)
1514 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 b = self.BytesIO(b"abc\n\xff\n")
1516 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001517 self.assertRaises(UnicodeError, t.read)
1518 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001519 b = self.BytesIO(b"abc\n\xff\n")
1520 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001521 self.assertEquals(t.read(), "abc\n\n")
1522 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001523 b = self.BytesIO(b"abc\n\xff\n")
1524 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001525 self.assertEquals(t.read(), "abc\n\ufffd\n")
1526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001528 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529 b = self.BytesIO()
1530 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001531 self.assertRaises(UnicodeError, t.write, "\xff")
1532 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 b = self.BytesIO()
1534 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001535 self.assertRaises(UnicodeError, t.write, "\xff")
1536 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001537 b = self.BytesIO()
1538 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001539 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001540 t.write("abc\xffdef\n")
1541 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001542 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001543 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 b = self.BytesIO()
1545 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001546 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001547 t.write("abc\xffdef\n")
1548 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001549 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001550
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001552 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1553
1554 tests = [
1555 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001556 [ '', input_lines ],
1557 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1558 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1559 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001560 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001561 encodings = (
1562 'utf-8', 'latin-1',
1563 'utf-16', 'utf-16-le', 'utf-16-be',
1564 'utf-32', 'utf-32-le', 'utf-32-be',
1565 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001566
Guido van Rossum8358db22007-08-18 21:39:55 +00001567 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001568 # character in TextIOWrapper._pending_line.
1569 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001570 # XXX: str.encode() should return bytes
1571 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001572 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001573 for bufsize in range(1, 10):
1574 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1576 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001577 encoding=encoding)
1578 if do_reads:
1579 got_lines = []
1580 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001581 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001582 if c2 == '':
1583 break
1584 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001585 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001586 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001587 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001588
1589 for got_line, exp_line in zip(got_lines, exp_lines):
1590 self.assertEquals(got_line, exp_line)
1591 self.assertEquals(len(got_lines), len(exp_lines))
1592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593 def test_newlines_input(self):
1594 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001595 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1596 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001597 (None, normalized.decode("ascii").splitlines(True)),
1598 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1600 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1601 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001602 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603 buf = self.BytesIO(testdata)
1604 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001605 self.assertEquals(txt.readlines(), expected)
1606 txt.seek(0)
1607 self.assertEquals(txt.read(), "".join(expected))
1608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def test_newlines_output(self):
1610 testdict = {
1611 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1612 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1613 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1614 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1615 }
1616 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1617 for newline, expected in tests:
1618 buf = self.BytesIO()
1619 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1620 txt.write("AAA\nB")
1621 txt.write("BB\nCCC\n")
1622 txt.write("X\rY\r\nZ")
1623 txt.flush()
1624 self.assertEquals(buf.closed, False)
1625 self.assertEquals(buf.getvalue(), expected)
1626
1627 def test_destructor(self):
1628 l = []
1629 base = self.BytesIO
1630 class MyBytesIO(base):
1631 def close(self):
1632 l.append(self.getvalue())
1633 base.close(self)
1634 b = MyBytesIO()
1635 t = self.TextIOWrapper(b, encoding="ascii")
1636 t.write("abc")
1637 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001638 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 self.assertEquals([b"abc"], l)
1640
1641 def test_override_destructor(self):
1642 record = []
1643 class MyTextIO(self.TextIOWrapper):
1644 def __del__(self):
1645 record.append(1)
1646 try:
1647 f = super().__del__
1648 except AttributeError:
1649 pass
1650 else:
1651 f()
1652 def close(self):
1653 record.append(2)
1654 super().close()
1655 def flush(self):
1656 record.append(3)
1657 super().flush()
1658 b = self.BytesIO()
1659 t = MyTextIO(b, encoding="ascii")
1660 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001661 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 self.assertEqual(record, [1, 2, 3])
1663
1664 def test_error_through_destructor(self):
1665 # Test that the exception state is not modified by a destructor,
1666 # even if close() fails.
1667 rawio = self.CloseFailureIO()
1668 def f():
1669 self.TextIOWrapper(rawio).xyzzy
1670 with support.captured_output("stderr") as s:
1671 self.assertRaises(AttributeError, f)
1672 s = s.getvalue().strip()
1673 if s:
1674 # The destructor *may* have printed an unraisable error, check it
1675 self.assertEqual(len(s.splitlines()), 1)
1676 self.assert_(s.startswith("Exception IOError: "), s)
1677 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001678
Guido van Rossum9b76da62007-04-11 01:09:03 +00001679 # Systematic tests of the text I/O API
1680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001682 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1683 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001684 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001685 f._CHUNK_SIZE = chunksize
1686 self.assertEquals(f.write("abc"), 3)
1687 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001689 f._CHUNK_SIZE = chunksize
1690 self.assertEquals(f.tell(), 0)
1691 self.assertEquals(f.read(), "abc")
1692 cookie = f.tell()
1693 self.assertEquals(f.seek(0), 0)
1694 self.assertEquals(f.read(2), "ab")
1695 self.assertEquals(f.read(1), "c")
1696 self.assertEquals(f.read(1), "")
1697 self.assertEquals(f.read(), "")
1698 self.assertEquals(f.tell(), cookie)
1699 self.assertEquals(f.seek(0), 0)
1700 self.assertEquals(f.seek(0, 2), cookie)
1701 self.assertEquals(f.write("def"), 3)
1702 self.assertEquals(f.seek(cookie), cookie)
1703 self.assertEquals(f.read(), "def")
1704 if enc.startswith("utf"):
1705 self.multi_line_test(f, enc)
1706 f.close()
1707
1708 def multi_line_test(self, f, enc):
1709 f.seek(0)
1710 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001711 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001712 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001713 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001714 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001715 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001716 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001717 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001718 wlines.append((f.tell(), line))
1719 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001720 f.seek(0)
1721 rlines = []
1722 while True:
1723 pos = f.tell()
1724 line = f.readline()
1725 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001726 break
1727 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001728 self.assertEquals(rlines, wlines)
1729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 def test_telling(self):
1731 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001732 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001733 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001734 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001735 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001736 p2 = f.tell()
1737 f.seek(0)
1738 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001739 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001740 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001741 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001742 self.assertEquals(f.tell(), p2)
1743 f.seek(0)
1744 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001745 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001746 self.assertRaises(IOError, f.tell)
1747 self.assertEquals(f.tell(), p2)
1748 f.close()
1749
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001750 def test_seeking(self):
1751 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001752 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001753 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001754 prefix = bytes(u_prefix.encode("utf-8"))
1755 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001756 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001757 suffix = bytes(u_suffix.encode("utf-8"))
1758 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001760 f.write(line*2)
1761 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001763 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001764 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001765 self.assertEquals(f.tell(), prefix_size)
1766 self.assertEquals(f.readline(), u_suffix)
1767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001768 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001769 # Regression test for a specific bug
1770 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001772 f.write(data)
1773 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001775 f._CHUNK_SIZE # Just test that it exists
1776 f._CHUNK_SIZE = 2
1777 f.readline()
1778 f.tell()
1779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 def test_seek_and_tell(self):
1781 #Test seek/tell using the StatefulIncrementalDecoder.
1782 # Make test faster by doing smaller seeks
1783 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001784
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001785 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001786 """Tell/seek to various points within a data stream and ensure
1787 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001789 f.write(data)
1790 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 f = self.open(support.TESTFN, encoding='test_decoder')
1792 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001793 decoded = f.read()
1794 f.close()
1795
Neal Norwitze2b07052008-03-18 19:52:05 +00001796 for i in range(min_pos, len(decoded) + 1): # seek positions
1797 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001798 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001799 self.assertEquals(f.read(i), decoded[:i])
1800 cookie = f.tell()
1801 self.assertEquals(f.read(j), decoded[i:i + j])
1802 f.seek(cookie)
1803 self.assertEquals(f.read(), decoded[i:])
1804 f.close()
1805
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001806 # Enable the test decoder.
1807 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001808
1809 # Run the tests.
1810 try:
1811 # Try each test case.
1812 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001813 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001814
1815 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001816 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1817 offset = CHUNK_SIZE - len(input)//2
1818 prefix = b'.'*offset
1819 # Don't bother seeking into the prefix (takes too long).
1820 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001821 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001822
1823 # Ensure our test decoder won't interfere with subsequent tests.
1824 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001825 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001828 data = "1234567890"
1829 tests = ("utf-16",
1830 "utf-16-le",
1831 "utf-16-be",
1832 "utf-32",
1833 "utf-32-le",
1834 "utf-32-be")
1835 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836 buf = self.BytesIO()
1837 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001838 # Check if the BOM is written only once (see issue1753).
1839 f.write(data)
1840 f.write(data)
1841 f.seek(0)
1842 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001843 f.seek(0)
1844 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001845 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1846
Benjamin Petersona1b49012009-03-31 23:11:32 +00001847 def test_unreadable(self):
1848 class UnReadable(self.BytesIO):
1849 def readable(self):
1850 return False
1851 txt = self.TextIOWrapper(UnReadable())
1852 self.assertRaises(IOError, txt.read)
1853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854 def test_read_one_by_one(self):
1855 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001856 reads = ""
1857 while True:
1858 c = txt.read(1)
1859 if not c:
1860 break
1861 reads += c
1862 self.assertEquals(reads, "AA\nBB")
1863
1864 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001866 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001867 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001868 reads = ""
1869 while True:
1870 c = txt.read(128)
1871 if not c:
1872 break
1873 reads += c
1874 self.assertEquals(reads, "A"*127+"\nB")
1875
1876 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001878
1879 # read one char at a time
1880 reads = ""
1881 while True:
1882 c = txt.read(1)
1883 if not c:
1884 break
1885 reads += c
1886 self.assertEquals(reads, self.normalized)
1887
1888 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001890 txt._CHUNK_SIZE = 4
1891
1892 reads = ""
1893 while True:
1894 c = txt.read(4)
1895 if not c:
1896 break
1897 reads += c
1898 self.assertEquals(reads, self.normalized)
1899
1900 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001901 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001902 txt._CHUNK_SIZE = 4
1903
1904 reads = txt.read(4)
1905 reads += txt.read(4)
1906 reads += txt.readline()
1907 reads += txt.readline()
1908 reads += txt.readline()
1909 self.assertEquals(reads, self.normalized)
1910
1911 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001913 txt._CHUNK_SIZE = 4
1914
1915 reads = txt.read(4)
1916 reads += txt.read()
1917 self.assertEquals(reads, self.normalized)
1918
1919 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001921 txt._CHUNK_SIZE = 4
1922
1923 reads = txt.read(4)
1924 pos = txt.tell()
1925 txt.seek(0)
1926 txt.seek(pos)
1927 self.assertEquals(txt.read(4), "BBB\n")
1928
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001929 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 buffer = self.BytesIO(self.testdata)
1931 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001932
1933 self.assertEqual(buffer.seekable(), txt.seekable())
1934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001935class CTextIOWrapperTest(TextIOWrapperTest):
1936
1937 def test_initialization(self):
1938 r = self.BytesIO(b"\xc3\xa9\n\n")
1939 b = self.BufferedReader(r, 1000)
1940 t = self.TextIOWrapper(b)
1941 self.assertRaises(TypeError, t.__init__, b, newline=42)
1942 self.assertRaises(ValueError, t.read)
1943 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1944 self.assertRaises(ValueError, t.read)
1945
1946 def test_garbage_collection(self):
1947 # C TextIOWrapper objects are collected, and collecting them flushes
1948 # all data to disk.
1949 # The Python version has __del__, so it ends in gc.garbage instead.
1950 rawio = io.FileIO(support.TESTFN, "wb")
1951 b = self.BufferedWriter(rawio)
1952 t = self.TextIOWrapper(b, encoding="ascii")
1953 t.write("456def")
1954 t.x = t
1955 wr = weakref.ref(t)
1956 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001957 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 self.assert_(wr() is None, wr)
1959 with open(support.TESTFN, "rb") as f:
1960 self.assertEqual(f.read(), b"456def")
1961
1962class PyTextIOWrapperTest(TextIOWrapperTest):
1963 pass
1964
1965
1966class IncrementalNewlineDecoderTest(unittest.TestCase):
1967
1968 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001969 # UTF-8 specific tests for a newline decoder
1970 def _check_decode(b, s, **kwargs):
1971 # We exercise getstate() / setstate() as well as decode()
1972 state = decoder.getstate()
1973 self.assertEquals(decoder.decode(b, **kwargs), s)
1974 decoder.setstate(state)
1975 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001976
Antoine Pitrou180a3362008-12-14 16:36:46 +00001977 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001978
Antoine Pitrou180a3362008-12-14 16:36:46 +00001979 _check_decode(b'\xe8', "")
1980 _check_decode(b'\xa2', "")
1981 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001982
Antoine Pitrou180a3362008-12-14 16:36:46 +00001983 _check_decode(b'\xe8', "")
1984 _check_decode(b'\xa2', "")
1985 _check_decode(b'\x88', "\u8888")
1986
1987 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001988 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1989
Antoine Pitrou180a3362008-12-14 16:36:46 +00001990 decoder.reset()
1991 _check_decode(b'\n', "\n")
1992 _check_decode(b'\r', "")
1993 _check_decode(b'', "\n", final=True)
1994 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001995
Antoine Pitrou180a3362008-12-14 16:36:46 +00001996 _check_decode(b'\r', "")
1997 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001998
Antoine Pitrou180a3362008-12-14 16:36:46 +00001999 _check_decode(b'\r\r\n', "\n\n")
2000 _check_decode(b'\r', "")
2001 _check_decode(b'\r', "\n")
2002 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002003
Antoine Pitrou180a3362008-12-14 16:36:46 +00002004 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2005 _check_decode(b'\xe8\xa2\x88', "\u8888")
2006 _check_decode(b'\n', "\n")
2007 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2008 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002010 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002011 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 if encoding is not None:
2013 encoder = codecs.getincrementalencoder(encoding)()
2014 def _decode_bytewise(s):
2015 # Decode one byte at a time
2016 for b in encoder.encode(s):
2017 result.append(decoder.decode(bytes([b])))
2018 else:
2019 encoder = None
2020 def _decode_bytewise(s):
2021 # Decode one char at a time
2022 for c in s:
2023 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002024 self.assertEquals(decoder.newlines, None)
2025 _decode_bytewise("abc\n\r")
2026 self.assertEquals(decoder.newlines, '\n')
2027 _decode_bytewise("\nabc")
2028 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2029 _decode_bytewise("abc\r")
2030 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2031 _decode_bytewise("abc")
2032 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2033 _decode_bytewise("abc\r")
2034 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2035 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 input = "abc"
2037 if encoder is not None:
2038 encoder.reset()
2039 input = encoder.encode(input)
2040 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002041 self.assertEquals(decoder.newlines, None)
2042
2043 def test_newline_decoder(self):
2044 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002045 # None meaning the IncrementalNewlineDecoder takes unicode input
2046 # rather than bytes input
2047 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002048 'utf-16', 'utf-16-le', 'utf-16-be',
2049 'utf-32', 'utf-32-le', 'utf-32-be',
2050 )
2051 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 decoder = enc and codecs.getincrementaldecoder(enc)()
2053 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2054 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002055 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002056 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2057 self.check_newline_decoding_utf8(decoder)
2058
Antoine Pitrou66913e22009-03-06 23:40:56 +00002059 def test_newline_bytes(self):
2060 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2061 def _check(dec):
2062 self.assertEquals(dec.newlines, None)
2063 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2064 self.assertEquals(dec.newlines, None)
2065 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2066 self.assertEquals(dec.newlines, None)
2067 dec = self.IncrementalNewlineDecoder(None, translate=False)
2068 _check(dec)
2069 dec = self.IncrementalNewlineDecoder(None, translate=True)
2070 _check(dec)
2071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002072class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2073 pass
2074
2075class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2076 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002077
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002078
Guido van Rossum01a27522007-03-07 01:00:12 +00002079# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002080
Guido van Rossum5abbf752007-08-27 17:39:33 +00002081class MiscIOTest(unittest.TestCase):
2082
Barry Warsaw40e82462008-11-20 20:14:50 +00002083 def tearDown(self):
2084 support.unlink(support.TESTFN)
2085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002086 def test___all__(self):
2087 for name in self.io.__all__:
2088 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002089 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002090 if name == "open":
2091 continue
2092 elif "error" in name.lower():
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002093 self.assertTrue(issubclass(obj, Exception), name)
2094 elif not name.startswith("SEEK_"):
2095 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002096
Barry Warsaw40e82462008-11-20 20:14:50 +00002097 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002099 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002100 f.close()
2101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002103 self.assertEquals(f.name, support.TESTFN)
2104 self.assertEquals(f.buffer.name, support.TESTFN)
2105 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2106 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002107 self.assertEquals(f.buffer.mode, "rb")
2108 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002109 f.close()
2110
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002112 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002113 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2114 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002117 self.assertEquals(g.mode, "wb")
2118 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002119 self.assertEquals(g.name, f.fileno())
2120 self.assertEquals(g.raw.name, f.fileno())
2121 f.close()
2122 g.close()
2123
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002124 def test_io_after_close(self):
2125 for kwargs in [
2126 {"mode": "w"},
2127 {"mode": "wb"},
2128 {"mode": "w", "buffering": 1},
2129 {"mode": "w", "buffering": 2},
2130 {"mode": "wb", "buffering": 0},
2131 {"mode": "r"},
2132 {"mode": "rb"},
2133 {"mode": "r", "buffering": 1},
2134 {"mode": "r", "buffering": 2},
2135 {"mode": "rb", "buffering": 0},
2136 {"mode": "w+"},
2137 {"mode": "w+b"},
2138 {"mode": "w+", "buffering": 1},
2139 {"mode": "w+", "buffering": 2},
2140 {"mode": "w+b", "buffering": 0},
2141 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002143 f.close()
2144 self.assertRaises(ValueError, f.flush)
2145 self.assertRaises(ValueError, f.fileno)
2146 self.assertRaises(ValueError, f.isatty)
2147 self.assertRaises(ValueError, f.__iter__)
2148 if hasattr(f, "peek"):
2149 self.assertRaises(ValueError, f.peek, 1)
2150 self.assertRaises(ValueError, f.read)
2151 if hasattr(f, "read1"):
2152 self.assertRaises(ValueError, f.read1, 1024)
2153 if hasattr(f, "readinto"):
2154 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2155 self.assertRaises(ValueError, f.readline)
2156 self.assertRaises(ValueError, f.readlines)
2157 self.assertRaises(ValueError, f.seek, 0)
2158 self.assertRaises(ValueError, f.tell)
2159 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160 self.assertRaises(ValueError, f.write,
2161 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002162 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 def test_blockingioerror(self):
2166 # Various BlockingIOError issues
2167 self.assertRaises(TypeError, self.BlockingIOError)
2168 self.assertRaises(TypeError, self.BlockingIOError, 1)
2169 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2170 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2171 b = self.BlockingIOError(1, "")
2172 self.assertEqual(b.characters_written, 0)
2173 class C(str):
2174 pass
2175 c = C("")
2176 b = self.BlockingIOError(1, c)
2177 c.b = b
2178 b.c = c
2179 wr = weakref.ref(c)
2180 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002181 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 self.assert_(wr() is None, wr)
2183
2184 def test_abcs(self):
2185 # Test the visible base classes are ABCs.
2186 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2187 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2188 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2189 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2190
2191 def _check_abc_inheritance(self, abcmodule):
2192 with self.open(support.TESTFN, "wb", buffering=0) as f:
2193 self.assertTrue(isinstance(f, abcmodule.IOBase))
2194 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2195 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2196 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2197 with self.open(support.TESTFN, "wb") as f:
2198 self.assertTrue(isinstance(f, abcmodule.IOBase))
2199 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2200 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2201 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2202 with self.open(support.TESTFN, "w") as f:
2203 self.assertTrue(isinstance(f, abcmodule.IOBase))
2204 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2205 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2206 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2207
2208 def test_abc_inheritance(self):
2209 # Test implementations inherit from their respective ABCs
2210 self._check_abc_inheritance(self)
2211
2212 def test_abc_inheritance_official(self):
2213 # Test implementations inherit from the official ABCs of the
2214 # baseline "io" module.
2215 self._check_abc_inheritance(io)
2216
2217class CMiscIOTest(MiscIOTest):
2218 io = io
2219
2220class PyMiscIOTest(MiscIOTest):
2221 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002222
Guido van Rossum28524c72007-02-27 05:47:44 +00002223def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002224 tests = (CIOTest, PyIOTest,
2225 CBufferedReaderTest, PyBufferedReaderTest,
2226 CBufferedWriterTest, PyBufferedWriterTest,
2227 CBufferedRWPairTest, PyBufferedRWPairTest,
2228 CBufferedRandomTest, PyBufferedRandomTest,
2229 StatefulIncrementalDecoderTest,
2230 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2231 CTextIOWrapperTest, PyTextIOWrapperTest,
2232 CMiscIOTest, PyMiscIOTest,)
2233
2234 # Put the namespaces of the IO module we are testing and some useful mock
2235 # classes in the __dict__ of each test.
2236 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2237 MockNonBlockWriterIO)
2238 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2239 c_io_ns = {name : getattr(io, name) for name in all_members}
2240 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2241 globs = globals()
2242 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2243 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2244 # Avoid turning open into a bound method.
2245 py_io_ns["open"] = pyio.OpenWrapper
2246 for test in tests:
2247 if test.__name__.startswith("C"):
2248 for name, obj in c_io_ns.items():
2249 setattr(test, name, obj)
2250 elif test.__name__.startswith("Py"):
2251 for name, obj in py_io_ns.items():
2252 setattr(test, name, obj)
2253
2254 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002255
2256if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 test_main()