blob: 745971464255175ed09b091f6e45c7bf319f736b [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000294 f = self.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000300 f = self.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000301 self.assertEqual(f.readable(), True)
302 self.assertEqual(f.writable(), False)
303 self.assertEqual(f.seekable(), True)
304 self.read_ops(f)
305 f.close()
306
Guido van Rossum87429772007-04-10 21:06:59 +0000307 def test_buffered_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000308 f = self.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.readable(), False)
310 self.assertEqual(f.writable(), True)
311 self.assertEqual(f.seekable(), True)
312 self.write_ops(f)
313 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000314 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.readable(), True)
316 self.assertEqual(f.writable(), False)
317 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000319 f.close()
320
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000321 def test_readline(self):
Benjamin Petersonb01138a2009-04-24 22:59:52 +0000322 f = self.open(support.TESTFN, "wb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000323 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000324 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000325 f = self.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000326 self.assertEqual(f.readline(), b"abc\n")
327 self.assertEqual(f.readline(10), b"def\n")
328 self.assertEqual(f.readline(2), b"xy")
329 self.assertEqual(f.readline(4), b"zzy\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000330 self.assertEqual(f.readline(), b"foo\x00bar\n")
331 self.assertEqual(f.readline(), b"another line")
Benjamin Petersonb01138a2009-04-24 22:59:52 +0000332 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000333 f.close()
Benjamin Petersonb01138a2009-04-24 22:59:52 +0000334 f = self.open(support.TESTFN, "r")
335 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000336
Guido van Rossum28524c72007-02-27 05:47:44 +0000337 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000338 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000339 self.write_ops(f)
340 data = f.getvalue()
341 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000342 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000343 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000344
Guido van Rossum53807da2007-04-10 19:01:47 +0000345 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000346 # On Windows and Mac OSX this test comsumes large resources; It takes
347 # a long time to build the >2GB file and takes >2GB of disk space
348 # therefore the resource must be enabled to run this test.
349 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000350 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000351 print("\nTesting large file ops skipped on %s." % sys.platform,
352 file=sys.stderr)
353 print("It requires %d bytes and a long time." % self.LARGE,
354 file=sys.stderr)
355 print("Use 'regrtest.py -u largefile test_io' to run it.",
356 file=sys.stderr)
357 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000358 with self.open(support.TESTFN, "w+b", 0) as f:
359 self.large_file_ops(f)
360 with self.open(support.TESTFN, "w+b") as f:
361 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000362
363 def test_with_open(self):
364 for bufsize in (0, 1, 100):
365 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000366 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000367 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000368 self.assertEqual(f.closed, True)
369 f = None
370 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000371 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000372 1/0
373 except ZeroDivisionError:
374 self.assertEqual(f.closed, True)
375 else:
376 self.fail("1/0 didn't raise an exception")
377
Antoine Pitrou08838b62009-01-21 00:55:13 +0000378 # issue 5008
379 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000380 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000381 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000382 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000383 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000384 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000385 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000386 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000387 self.assert_(f.tell() > 0)
388
Guido van Rossum87429772007-04-10 21:06:59 +0000389 def test_destructor(self):
390 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000392 def __del__(self):
393 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000394 try:
395 f = super().__del__
396 except AttributeError:
397 pass
398 else:
399 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000400 def close(self):
401 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000403 def flush(self):
404 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 super().flush()
406 f = MyFileIO(support.TESTFN, "wb")
407 f.write(b"xxx")
408 del f
409 self.assertEqual(record, [1, 2, 3])
410 f = open(support.TESTFN, "rb")
411 self.assertEqual(f.read(), b"xxx")
412
413 def _check_base_destructor(self, base):
414 record = []
415 class MyIO(base):
416 def __init__(self):
417 # This exercises the availability of attributes on object
418 # destruction.
419 # (in the C version, close() is called by the tp_dealloc
420 # function, not by __del__)
421 self.on_del = 1
422 self.on_close = 2
423 self.on_flush = 3
424 def __del__(self):
425 record.append(self.on_del)
426 try:
427 f = super().__del__
428 except AttributeError:
429 pass
430 else:
431 f()
432 def close(self):
433 record.append(self.on_close)
434 super().close()
435 def flush(self):
436 record.append(self.on_flush)
437 super().flush()
438 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000439 del f
440 self.assertEqual(record, [1, 2, 3])
441
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442 def test_IOBase_destructor(self):
443 self._check_base_destructor(self.IOBase)
444
445 def test_RawIOBase_destructor(self):
446 self._check_base_destructor(self.RawIOBase)
447
448 def test_BufferedIOBase_destructor(self):
449 self._check_base_destructor(self.BufferedIOBase)
450
451 def test_TextIOBase_destructor(self):
452 self._check_base_destructor(self.TextIOBase)
453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_close_flushes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 f = self.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000456 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000457 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000459 self.assertEqual(f.read(), b"xxx")
460 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossumd4103952007-04-12 05:44:49 +0000462 def test_array_writes(self):
463 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000464 n = len(a.tostring())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 f = self.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000466 self.assertEqual(f.write(a), n)
467 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 f = self.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000469 self.assertEqual(f.write(a), n)
470 f.close()
471
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000472 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000474 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 def test_read_closed(self):
477 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000478 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 with self.open(support.TESTFN, "r") as f:
480 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000481 self.assertEqual(file.read(), "egg\n")
482 file.seek(0)
483 file.close()
484 self.assertRaises(ValueError, file.read)
485
486 def test_no_closefd_with_filename(self):
487 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000489
490 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000491 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000492 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000494 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000495 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000496 self.assertEqual(file.buffer.raw.closefd, False)
497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 def test_garbage_collection(self):
499 # FileIO objects are collected, and collecting them flushes
500 # all data to disk.
501 f = self.FileIO(support.TESTFN, "wb")
502 f.write(b"abcxxx")
503 f.f = f
504 wr = weakref.ref(f)
505 del f
506 gc.collect()
507 self.assert_(wr() is None, wr)
508 with open(support.TESTFN, "rb") as f:
509 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000510
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000511 def test_unbounded_file(self):
512 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
513 zero = "/dev/zero"
514 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000515 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000516 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000517 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000518 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000519 self.skipTest("test requires at least 2GB of memory")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000520 with open(zero, "rb", buffering=0) as f:
521 self.assertRaises(OverflowError, f.read)
522 with open(zero, "rb") as f:
523 self.assertRaises(OverflowError, f.read)
524 with open(zero, "r") as f:
525 self.assertRaises(OverflowError, f.read)
526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000527class CIOTest(IOTest):
528 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000530class PyIOTest(IOTest):
531 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000532
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534class CommonBufferedTests:
535 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
536
537 def test_fileno(self):
538 rawio = self.MockRawIO()
539 bufio = self.tp(rawio)
540
541 self.assertEquals(42, bufio.fileno())
542
543 def test_no_fileno(self):
544 # XXX will we always have fileno() function? If so, kill
545 # this test. Else, write it.
546 pass
547
548 def test_invalid_args(self):
549 rawio = self.MockRawIO()
550 bufio = self.tp(rawio)
551 # Invalid whence
552 self.assertRaises(ValueError, bufio.seek, 0, -1)
553 self.assertRaises(ValueError, bufio.seek, 0, 3)
554
555 def test_override_destructor(self):
556 tp = self.tp
557 record = []
558 class MyBufferedIO(tp):
559 def __del__(self):
560 record.append(1)
561 try:
562 f = super().__del__
563 except AttributeError:
564 pass
565 else:
566 f()
567 def close(self):
568 record.append(2)
569 super().close()
570 def flush(self):
571 record.append(3)
572 super().flush()
573 rawio = self.MockRawIO()
574 bufio = MyBufferedIO(rawio)
575 writable = bufio.writable()
576 del bufio
577 if writable:
578 self.assertEqual(record, [1, 2, 3])
579 else:
580 self.assertEqual(record, [1, 2])
581
582 def test_context_manager(self):
583 # Test usability as a context manager
584 rawio = self.MockRawIO()
585 bufio = self.tp(rawio)
586 def _with():
587 with bufio:
588 pass
589 _with()
590 # bufio should now be closed, and using it a second time should raise
591 # a ValueError.
592 self.assertRaises(ValueError, _with)
593
594 def test_error_through_destructor(self):
595 # Test that the exception state is not modified by a destructor,
596 # even if close() fails.
597 rawio = self.CloseFailureIO()
598 def f():
599 self.tp(rawio).xyzzy
600 with support.captured_output("stderr") as s:
601 self.assertRaises(AttributeError, f)
602 s = s.getvalue().strip()
603 if s:
604 # The destructor *may* have printed an unraisable error, check it
605 self.assertEqual(len(s.splitlines()), 1)
606 self.assert_(s.startswith("Exception IOError: "), s)
607 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000608
609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
611 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613 def test_constructor(self):
614 rawio = self.MockRawIO([b"abc"])
615 bufio = self.tp(rawio)
616 bufio.__init__(rawio)
617 bufio.__init__(rawio, buffer_size=1024)
618 bufio.__init__(rawio, buffer_size=16)
619 self.assertEquals(b"abc", bufio.read())
620 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
621 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
622 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
623 rawio = self.MockRawIO([b"abc"])
624 bufio.__init__(rawio)
625 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000627 def test_read(self):
628 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
629 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000630 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 # Invalid args
632 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 def test_read1(self):
635 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
636 bufio = self.tp(rawio)
637 self.assertEquals(b"a", bufio.read(1))
638 self.assertEquals(b"b", bufio.read1(1))
639 self.assertEquals(rawio._reads, 1)
640 self.assertEquals(b"c", bufio.read1(100))
641 self.assertEquals(rawio._reads, 1)
642 self.assertEquals(b"d", bufio.read1(100))
643 self.assertEquals(rawio._reads, 2)
644 self.assertEquals(b"efg", bufio.read1(100))
645 self.assertEquals(rawio._reads, 3)
646 self.assertEquals(b"", bufio.read1(100))
647 # Invalid args
648 self.assertRaises(ValueError, bufio.read1, -1)
649
650 def test_readinto(self):
651 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
652 bufio = self.tp(rawio)
653 b = bytearray(2)
654 self.assertEquals(bufio.readinto(b), 2)
655 self.assertEquals(b, b"ab")
656 self.assertEquals(bufio.readinto(b), 2)
657 self.assertEquals(b, b"cd")
658 self.assertEquals(bufio.readinto(b), 2)
659 self.assertEquals(b, b"ef")
660 self.assertEquals(bufio.readinto(b), 1)
661 self.assertEquals(b, b"gf")
662 self.assertEquals(bufio.readinto(b), 0)
663 self.assertEquals(b, b"gf")
664
665 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000666 data = b"abcdefghi"
667 dlen = len(data)
668
669 tests = [
670 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
671 [ 100, [ 3, 3, 3], [ dlen ] ],
672 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
673 ]
674
675 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000676 rawio = self.MockFileIO(data)
677 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000678 pos = 0
679 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000680 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000681 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000683 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000685 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000686 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
688 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000689
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000690 self.assertEquals(b"abcd", bufio.read(6))
691 self.assertEquals(b"e", bufio.read(1))
692 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000694 self.assert_(None is bufio.read())
695 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_read_past_eof(self):
698 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
699 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000700
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000701 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 def test_read_all(self):
704 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
705 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000706
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000707 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000710 try:
711 # Write out many bytes with exactly the same number of 0's,
712 # 1's... 255's. This will help us check that concurrent reading
713 # doesn't duplicate or forget contents.
714 N = 1000
715 l = list(range(256)) * N
716 random.shuffle(l)
717 s = bytes(bytearray(l))
718 with io.open(support.TESTFN, "wb") as f:
719 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000720 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
721 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000722 errors = []
723 results = []
724 def f():
725 try:
726 # Intra-buffer read then buffer-flushing read
727 for n in cycle([1, 19]):
728 s = bufio.read(n)
729 if not s:
730 break
731 # list.append() is atomic
732 results.append(s)
733 except Exception as e:
734 errors.append(e)
735 raise
736 threads = [threading.Thread(target=f) for x in range(20)]
737 for t in threads:
738 t.start()
739 time.sleep(0.02) # yield
740 for t in threads:
741 t.join()
742 self.assertFalse(errors,
743 "the following exceptions were caught: %r" % errors)
744 s = b''.join(results)
745 for i in range(256):
746 c = bytes(bytearray([i]))
747 self.assertEqual(s.count(c), N)
748 finally:
749 support.unlink(support.TESTFN)
750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 def test_misbehaved_io(self):
752 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
753 bufio = self.tp(rawio)
754 self.assertRaises(IOError, bufio.seek, 0)
755 self.assertRaises(IOError, bufio.tell)
756
757class CBufferedReaderTest(BufferedReaderTest):
758 tp = io.BufferedReader
759
760 def test_constructor(self):
761 BufferedReaderTest.test_constructor(self)
762 # The allocation can succeed on 32-bit builds, e.g. with more
763 # than 2GB RAM and a 64-bit kernel.
764 if sys.maxsize > 0x7FFFFFFF:
765 rawio = self.MockRawIO()
766 bufio = self.tp(rawio)
767 self.assertRaises((OverflowError, MemoryError, ValueError),
768 bufio.__init__, rawio, sys.maxsize)
769
770 def test_initialization(self):
771 rawio = self.MockRawIO([b"abc"])
772 bufio = self.tp(rawio)
773 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
774 self.assertRaises(ValueError, bufio.read)
775 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
776 self.assertRaises(ValueError, bufio.read)
777 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
778 self.assertRaises(ValueError, bufio.read)
779
780 def test_misbehaved_io_read(self):
781 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
782 bufio = self.tp(rawio)
783 # _pyio.BufferedReader seems to implement reading different, so that
784 # checking this is not so easy.
785 self.assertRaises(IOError, bufio.read, 10)
786
787 def test_garbage_collection(self):
788 # C BufferedReader objects are collected.
789 # The Python version has __del__, so it ends into gc.garbage instead
790 rawio = self.FileIO(support.TESTFN, "w+b")
791 f = self.tp(rawio)
792 f.f = f
793 wr = weakref.ref(f)
794 del f
795 gc.collect()
796 self.assert_(wr() is None, wr)
797
798class PyBufferedReaderTest(BufferedReaderTest):
799 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000800
Guido van Rossuma9e20242007-03-08 00:43:48 +0000801
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000802class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
803 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000804
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000805 def test_constructor(self):
806 rawio = self.MockRawIO()
807 bufio = self.tp(rawio)
808 bufio.__init__(rawio)
809 bufio.__init__(rawio, buffer_size=1024)
810 bufio.__init__(rawio, buffer_size=16)
811 self.assertEquals(3, bufio.write(b"abc"))
812 bufio.flush()
813 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
814 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
815 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
816 bufio.__init__(rawio)
817 self.assertEquals(3, bufio.write(b"ghi"))
818 bufio.flush()
819 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
820
821 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000822 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 writer = self.MockRawIO()
824 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000825 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000826 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000828 def test_write_overflow(self):
829 writer = self.MockRawIO()
830 bufio = self.tp(writer, 8)
831 contents = b"abcdefghijklmnop"
832 for n in range(0, len(contents), 3):
833 bufio.write(contents[n:n+3])
834 flushed = b"".join(writer._write_stack)
835 # At least (total - 8) bytes were implicitly flushed, perhaps more
836 # depending on the implementation.
837 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000839 def check_writes(self, intermediate_func):
840 # Lots of writes, test the flushed output is as expected.
841 contents = bytes(range(256)) * 1000
842 n = 0
843 writer = self.MockRawIO()
844 bufio = self.tp(writer, 13)
845 # Generator of write sizes: repeat each N 15 times then proceed to N+1
846 def gen_sizes():
847 for size in count(1):
848 for i in range(15):
849 yield size
850 sizes = gen_sizes()
851 while n < len(contents):
852 size = min(next(sizes), len(contents) - n)
853 self.assertEquals(bufio.write(contents[n:n+size]), size)
854 intermediate_func(bufio)
855 n += size
856 bufio.flush()
857 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_writes(self):
860 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_writes_and_flushes(self):
863 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000864
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000865 def test_writes_and_seeks(self):
866 def _seekabs(bufio):
867 pos = bufio.tell()
868 bufio.seek(pos + 1, 0)
869 bufio.seek(pos - 1, 0)
870 bufio.seek(pos, 0)
871 self.check_writes(_seekabs)
872 def _seekrel(bufio):
873 pos = bufio.seek(0, 1)
874 bufio.seek(+1, 1)
875 bufio.seek(-1, 1)
876 bufio.seek(pos, 0)
877 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 def test_writes_and_truncates(self):
880 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 def test_write_non_blocking(self):
883 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000884 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000885
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000886 self.assertEquals(bufio.write(b"abcd"), 4)
887 self.assertEquals(bufio.write(b"efghi"), 5)
888 # 1 byte will be written, the rest will be buffered
889 raw.block_on(b"k")
890 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 # 8 bytes will be written, 8 will be buffered and the rest will be lost
893 raw.block_on(b"0")
894 try:
895 bufio.write(b"opqrwxyz0123456789")
896 except self.BlockingIOError as e:
897 written = e.characters_written
898 else:
899 self.fail("BlockingIOError should have been raised")
900 self.assertEquals(written, 16)
901 self.assertEquals(raw.pop_written(),
902 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
905 s = raw.pop_written()
906 # Previously buffered bytes were flushed
907 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_write_and_rewind(self):
910 raw = io.BytesIO()
911 bufio = self.tp(raw, 4)
912 self.assertEqual(bufio.write(b"abcdef"), 6)
913 self.assertEqual(bufio.tell(), 6)
914 bufio.seek(0, 0)
915 self.assertEqual(bufio.write(b"XY"), 2)
916 bufio.seek(6, 0)
917 self.assertEqual(raw.getvalue(), b"XYcdef")
918 self.assertEqual(bufio.write(b"123456"), 6)
919 bufio.flush()
920 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 def test_flush(self):
923 writer = self.MockRawIO()
924 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000925 bufio.write(b"abc")
926 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000927 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 def test_destructor(self):
930 writer = self.MockRawIO()
931 bufio = self.tp(writer, 8)
932 bufio.write(b"abc")
933 del bufio
934 self.assertEquals(b"abc", writer._write_stack[0])
935
936 def test_truncate(self):
937 # Truncate implicitly flushes the buffer.
938 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
939 bufio = self.tp(raw, 8)
940 bufio.write(b"abcdef")
941 self.assertEqual(bufio.truncate(3), 3)
942 self.assertEqual(bufio.tell(), 3)
943 with io.open(support.TESTFN, "rb", buffering=0) as f:
944 self.assertEqual(f.read(), b"abc")
945
946 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000947 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 # Write out many bytes from many threads and test they were
949 # all flushed.
950 N = 1000
951 contents = bytes(range(256)) * N
952 sizes = cycle([1, 19])
953 n = 0
954 queue = deque()
955 while n < len(contents):
956 size = next(sizes)
957 queue.append(contents[n:n+size])
958 n += size
959 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000960 # We use a real file object because it allows us to
961 # exercise situations where the GIL is released before
962 # writing the buffer to the raw streams. This is in addition
963 # to concurrency issues due to switching threads in the middle
964 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
966 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000967 errors = []
968 def f():
969 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000970 while True:
971 try:
972 s = queue.popleft()
973 except IndexError:
974 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 bufio.write(s)
976 except Exception as e:
977 errors.append(e)
978 raise
979 threads = [threading.Thread(target=f) for x in range(20)]
980 for t in threads:
981 t.start()
982 time.sleep(0.02) # yield
983 for t in threads:
984 t.join()
985 self.assertFalse(errors,
986 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987 bufio.close()
988 with io.open(support.TESTFN, "rb") as f:
989 s = f.read()
990 for i in range(256):
991 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000992 finally:
993 support.unlink(support.TESTFN)
994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 def test_misbehaved_io(self):
996 rawio = self.MisbehavedRawIO()
997 bufio = self.tp(rawio, 5)
998 self.assertRaises(IOError, bufio.seek, 0)
999 self.assertRaises(IOError, bufio.tell)
1000 self.assertRaises(IOError, bufio.write, b"abcdef")
1001
Benjamin Peterson59406a92009-03-26 17:10:29 +00001002 def test_max_buffer_size_deprecation(self):
1003 with support.check_warnings() as w:
1004 warnings.simplefilter("always", DeprecationWarning)
1005 self.tp(self.MockRawIO(), 8, 12)
1006 self.assertEqual(len(w.warnings), 1)
1007 warning = w.warnings[0]
1008 self.assertTrue(warning.category is DeprecationWarning)
1009 self.assertEqual(str(warning.message),
1010 "max_buffer_size is deprecated")
1011
1012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013class CBufferedWriterTest(BufferedWriterTest):
1014 tp = io.BufferedWriter
1015
1016 def test_constructor(self):
1017 BufferedWriterTest.test_constructor(self)
1018 # The allocation can succeed on 32-bit builds, e.g. with more
1019 # than 2GB RAM and a 64-bit kernel.
1020 if sys.maxsize > 0x7FFFFFFF:
1021 rawio = self.MockRawIO()
1022 bufio = self.tp(rawio)
1023 self.assertRaises((OverflowError, MemoryError, ValueError),
1024 bufio.__init__, rawio, sys.maxsize)
1025
1026 def test_initialization(self):
1027 rawio = self.MockRawIO()
1028 bufio = self.tp(rawio)
1029 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1030 self.assertRaises(ValueError, bufio.write, b"def")
1031 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1032 self.assertRaises(ValueError, bufio.write, b"def")
1033 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1034 self.assertRaises(ValueError, bufio.write, b"def")
1035
1036 def test_garbage_collection(self):
1037 # C BufferedWriter objects are collected, and collecting them flushes
1038 # all data to disk.
1039 # The Python version has __del__, so it ends into gc.garbage instead
1040 rawio = self.FileIO(support.TESTFN, "w+b")
1041 f = self.tp(rawio)
1042 f.write(b"123xxx")
1043 f.x = f
1044 wr = weakref.ref(f)
1045 del f
1046 gc.collect()
1047 self.assert_(wr() is None, wr)
1048 with open(support.TESTFN, "rb") as f:
1049 self.assertEqual(f.read(), b"123xxx")
1050
1051
1052class PyBufferedWriterTest(BufferedWriterTest):
1053 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001054
Guido van Rossum01a27522007-03-07 01:00:12 +00001055class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001056
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001057 def test_constructor(self):
1058 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001059 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001060
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001061 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001062 with support.check_warnings() as w:
1063 warnings.simplefilter("always", DeprecationWarning)
1064 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1065 self.assertEqual(len(w.warnings), 1)
1066 warning = w.warnings[0]
1067 self.assertTrue(warning.category is DeprecationWarning)
1068 self.assertEqual(str(warning.message),
1069 "max_buffer_size is deprecated")
1070
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001071 def test_constructor_with_not_readable(self):
1072 class NotReadable(MockRawIO):
1073 def readable(self):
1074 return False
1075
1076 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1077
1078 def test_constructor_with_not_writeable(self):
1079 class NotWriteable(MockRawIO):
1080 def writable(self):
1081 return False
1082
1083 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1084
1085 def test_read(self):
1086 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1087
1088 self.assertEqual(pair.read(3), b"abc")
1089 self.assertEqual(pair.read(1), b"d")
1090 self.assertEqual(pair.read(), b"ef")
1091
1092 def test_read1(self):
1093 # .read1() is delegated to the underlying reader object, so this test
1094 # can be shallow.
1095 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1096
1097 self.assertEqual(pair.read1(3), b"abc")
1098
1099 def test_readinto(self):
1100 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1101
1102 data = bytearray(5)
1103 self.assertEqual(pair.readinto(data), 5)
1104 self.assertEqual(data, b"abcde")
1105
1106 def test_write(self):
1107 w = self.MockRawIO()
1108 pair = self.tp(self.MockRawIO(), w)
1109
1110 pair.write(b"abc")
1111 pair.flush()
1112 pair.write(b"def")
1113 pair.flush()
1114 self.assertEqual(w._write_stack, [b"abc", b"def"])
1115
1116 def test_peek(self):
1117 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1118
1119 self.assertTrue(pair.peek(3).startswith(b"abc"))
1120 self.assertEqual(pair.read(3), b"abc")
1121
1122 def test_readable(self):
1123 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1124 self.assertTrue(pair.readable())
1125
1126 def test_writeable(self):
1127 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1128 self.assertTrue(pair.writable())
1129
1130 def test_seekable(self):
1131 # BufferedRWPairs are never seekable, even if their readers and writers
1132 # are.
1133 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1134 self.assertFalse(pair.seekable())
1135
1136 # .flush() is delegated to the underlying writer object and has been
1137 # tested in the test_write method.
1138
1139 def test_close_and_closed(self):
1140 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1141 self.assertFalse(pair.closed)
1142 pair.close()
1143 self.assertTrue(pair.closed)
1144
1145 def test_isatty(self):
1146 class SelectableIsAtty(MockRawIO):
1147 def __init__(self, isatty):
1148 MockRawIO.__init__(self)
1149 self._isatty = isatty
1150
1151 def isatty(self):
1152 return self._isatty
1153
1154 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1155 self.assertFalse(pair.isatty())
1156
1157 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1158 self.assertTrue(pair.isatty())
1159
1160 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1161 self.assertTrue(pair.isatty())
1162
1163 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1164 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166class CBufferedRWPairTest(BufferedRWPairTest):
1167 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169class PyBufferedRWPairTest(BufferedRWPairTest):
1170 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172
1173class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1174 read_mode = "rb+"
1175 write_mode = "wb+"
1176
1177 def test_constructor(self):
1178 BufferedReaderTest.test_constructor(self)
1179 BufferedWriterTest.test_constructor(self)
1180
1181 def test_read_and_write(self):
1182 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001183 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001184
1185 self.assertEqual(b"as", rw.read(2))
1186 rw.write(b"ddd")
1187 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001188 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001190 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001192 def test_seek_and_tell(self):
1193 raw = self.BytesIO(b"asdfghjkl")
1194 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001195
1196 self.assertEquals(b"as", rw.read(2))
1197 self.assertEquals(2, rw.tell())
1198 rw.seek(0, 0)
1199 self.assertEquals(b"asdf", rw.read(4))
1200
1201 rw.write(b"asdf")
1202 rw.seek(0, 0)
1203 self.assertEquals(b"asdfasdfl", rw.read())
1204 self.assertEquals(9, rw.tell())
1205 rw.seek(-4, 2)
1206 self.assertEquals(5, rw.tell())
1207 rw.seek(2, 1)
1208 self.assertEquals(7, rw.tell())
1209 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001210 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001212 def check_flush_and_read(self, read_func):
1213 raw = self.BytesIO(b"abcdefghi")
1214 bufio = self.tp(raw)
1215
1216 self.assertEquals(b"ab", read_func(bufio, 2))
1217 bufio.write(b"12")
1218 self.assertEquals(b"ef", read_func(bufio, 2))
1219 self.assertEquals(6, bufio.tell())
1220 bufio.flush()
1221 self.assertEquals(6, bufio.tell())
1222 self.assertEquals(b"ghi", read_func(bufio))
1223 raw.seek(0, 0)
1224 raw.write(b"XYZ")
1225 # flush() resets the read buffer
1226 bufio.flush()
1227 bufio.seek(0, 0)
1228 self.assertEquals(b"XYZ", read_func(bufio, 3))
1229
1230 def test_flush_and_read(self):
1231 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1232
1233 def test_flush_and_readinto(self):
1234 def _readinto(bufio, n=-1):
1235 b = bytearray(n if n >= 0 else 9999)
1236 n = bufio.readinto(b)
1237 return bytes(b[:n])
1238 self.check_flush_and_read(_readinto)
1239
1240 def test_flush_and_peek(self):
1241 def _peek(bufio, n=-1):
1242 # This relies on the fact that the buffer can contain the whole
1243 # raw stream, otherwise peek() can return less.
1244 b = bufio.peek(n)
1245 if n != -1:
1246 b = b[:n]
1247 bufio.seek(len(b), 1)
1248 return b
1249 self.check_flush_and_read(_peek)
1250
1251 def test_flush_and_write(self):
1252 raw = self.BytesIO(b"abcdefghi")
1253 bufio = self.tp(raw)
1254
1255 bufio.write(b"123")
1256 bufio.flush()
1257 bufio.write(b"45")
1258 bufio.flush()
1259 bufio.seek(0, 0)
1260 self.assertEquals(b"12345fghi", raw.getvalue())
1261 self.assertEquals(b"12345fghi", bufio.read())
1262
1263 def test_threads(self):
1264 BufferedReaderTest.test_threads(self)
1265 BufferedWriterTest.test_threads(self)
1266
1267 def test_writes_and_peek(self):
1268 def _peek(bufio):
1269 bufio.peek(1)
1270 self.check_writes(_peek)
1271 def _peek(bufio):
1272 pos = bufio.tell()
1273 bufio.seek(-1, 1)
1274 bufio.peek(1)
1275 bufio.seek(pos, 0)
1276 self.check_writes(_peek)
1277
1278 def test_writes_and_reads(self):
1279 def _read(bufio):
1280 bufio.seek(-1, 1)
1281 bufio.read(1)
1282 self.check_writes(_read)
1283
1284 def test_writes_and_read1s(self):
1285 def _read1(bufio):
1286 bufio.seek(-1, 1)
1287 bufio.read1(1)
1288 self.check_writes(_read1)
1289
1290 def test_writes_and_readintos(self):
1291 def _read(bufio):
1292 bufio.seek(-1, 1)
1293 bufio.readinto(bytearray(1))
1294 self.check_writes(_read)
1295
1296 def test_misbehaved_io(self):
1297 BufferedReaderTest.test_misbehaved_io(self)
1298 BufferedWriterTest.test_misbehaved_io(self)
1299
1300class CBufferedRandomTest(BufferedRandomTest):
1301 tp = io.BufferedRandom
1302
1303 def test_constructor(self):
1304 BufferedRandomTest.test_constructor(self)
1305 # The allocation can succeed on 32-bit builds, e.g. with more
1306 # than 2GB RAM and a 64-bit kernel.
1307 if sys.maxsize > 0x7FFFFFFF:
1308 rawio = self.MockRawIO()
1309 bufio = self.tp(rawio)
1310 self.assertRaises((OverflowError, MemoryError, ValueError),
1311 bufio.__init__, rawio, sys.maxsize)
1312
1313 def test_garbage_collection(self):
1314 CBufferedReaderTest.test_garbage_collection(self)
1315 CBufferedWriterTest.test_garbage_collection(self)
1316
1317class PyBufferedRandomTest(BufferedRandomTest):
1318 tp = pyio.BufferedRandom
1319
1320
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001321# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1322# properties:
1323# - A single output character can correspond to many bytes of input.
1324# - The number of input bytes to complete the character can be
1325# undetermined until the last input byte is received.
1326# - The number of input bytes can vary depending on previous input.
1327# - A single input byte can correspond to many characters of output.
1328# - The number of output characters can be undetermined until the
1329# last input byte is received.
1330# - The number of output characters can vary depending on previous input.
1331
1332class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1333 """
1334 For testing seek/tell behavior with a stateful, buffering decoder.
1335
1336 Input is a sequence of words. Words may be fixed-length (length set
1337 by input) or variable-length (period-terminated). In variable-length
1338 mode, extra periods are ignored. Possible words are:
1339 - 'i' followed by a number sets the input length, I (maximum 99).
1340 When I is set to 0, words are space-terminated.
1341 - 'o' followed by a number sets the output length, O (maximum 99).
1342 - Any other word is converted into a word followed by a period on
1343 the output. The output word consists of the input word truncated
1344 or padded out with hyphens to make its length equal to O. If O
1345 is 0, the word is output verbatim without truncating or padding.
1346 I and O are initially set to 1. When I changes, any buffered input is
1347 re-scanned according to the new I. EOF also terminates the last word.
1348 """
1349
1350 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001351 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001352 self.reset()
1353
1354 def __repr__(self):
1355 return '<SID %x>' % id(self)
1356
1357 def reset(self):
1358 self.i = 1
1359 self.o = 1
1360 self.buffer = bytearray()
1361
1362 def getstate(self):
1363 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1364 return bytes(self.buffer), i*100 + o
1365
1366 def setstate(self, state):
1367 buffer, io = state
1368 self.buffer = bytearray(buffer)
1369 i, o = divmod(io, 100)
1370 self.i, self.o = i ^ 1, o ^ 1
1371
1372 def decode(self, input, final=False):
1373 output = ''
1374 for b in input:
1375 if self.i == 0: # variable-length, terminated with period
1376 if b == ord('.'):
1377 if self.buffer:
1378 output += self.process_word()
1379 else:
1380 self.buffer.append(b)
1381 else: # fixed-length, terminate after self.i bytes
1382 self.buffer.append(b)
1383 if len(self.buffer) == self.i:
1384 output += self.process_word()
1385 if final and self.buffer: # EOF terminates the last word
1386 output += self.process_word()
1387 return output
1388
1389 def process_word(self):
1390 output = ''
1391 if self.buffer[0] == ord('i'):
1392 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1393 elif self.buffer[0] == ord('o'):
1394 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1395 else:
1396 output = self.buffer.decode('ascii')
1397 if len(output) < self.o:
1398 output += '-'*self.o # pad out with hyphens
1399 if self.o:
1400 output = output[:self.o] # truncate to output length
1401 output += '.'
1402 self.buffer = bytearray()
1403 return output
1404
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001405 codecEnabled = False
1406
1407 @classmethod
1408 def lookupTestDecoder(cls, name):
1409 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001410 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001411 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001412 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001413 incrementalencoder=None,
1414 streamreader=None, streamwriter=None,
1415 incrementaldecoder=cls)
1416
1417# Register the previous decoder for testing.
1418# Disabled by default, tests will enable it.
1419codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1420
1421
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001422class StatefulIncrementalDecoderTest(unittest.TestCase):
1423 """
1424 Make sure the StatefulIncrementalDecoder actually works.
1425 """
1426
1427 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001428 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001429 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001430 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001431 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001432 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001433 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001434 # I=0, O=6 (variable-length input, fixed-length output)
1435 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1436 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001437 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001438 # I=6, O=3 (fixed-length input > fixed-length output)
1439 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1440 # I=0, then 3; O=29, then 15 (with longer output)
1441 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1442 'a----------------------------.' +
1443 'b----------------------------.' +
1444 'cde--------------------------.' +
1445 'abcdefghijabcde.' +
1446 'a.b------------.' +
1447 '.c.------------.' +
1448 'd.e------------.' +
1449 'k--------------.' +
1450 'l--------------.' +
1451 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001452 ]
1453
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001455 # Try a few one-shot test cases.
1456 for input, eof, output in self.test_cases:
1457 d = StatefulIncrementalDecoder()
1458 self.assertEquals(d.decode(input, eof), output)
1459
1460 # Also test an unfinished decode, followed by forcing EOF.
1461 d = StatefulIncrementalDecoder()
1462 self.assertEquals(d.decode(b'oiabcd'), '')
1463 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001464
1465class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001466
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001467 def setUp(self):
1468 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1469 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001470 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001471
Guido van Rossumd0712812007-04-11 16:32:43 +00001472 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001473 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001474
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001475 def test_constructor(self):
1476 r = self.BytesIO(b"\xc3\xa9\n\n")
1477 b = self.BufferedReader(r, 1000)
1478 t = self.TextIOWrapper(b)
1479 t.__init__(b, encoding="latin1", newline="\r\n")
1480 self.assertEquals(t.encoding, "latin1")
1481 self.assertEquals(t.line_buffering, False)
1482 t.__init__(b, encoding="utf8", line_buffering=True)
1483 self.assertEquals(t.encoding, "utf8")
1484 self.assertEquals(t.line_buffering, True)
1485 self.assertEquals("\xe9\n", t.readline())
1486 self.assertRaises(TypeError, t.__init__, b, newline=42)
1487 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1488
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001489 def test_repr(self):
1490 raw = self.BytesIO("hello".encode("utf-8"))
1491 b = self.BufferedReader(raw)
1492 t = self.TextIOWrapper(b, encoding="utf-8")
1493 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495 def test_line_buffering(self):
1496 r = self.BytesIO()
1497 b = self.BufferedWriter(r, 1000)
1498 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001499 t.write("X")
1500 self.assertEquals(r.getvalue(), b"") # No flush happened
1501 t.write("Y\nZ")
1502 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1503 t.write("A\rB")
1504 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1505
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001506 def test_encoding(self):
1507 # Check the encoding attribute is always set, and valid
1508 b = self.BytesIO()
1509 t = self.TextIOWrapper(b, encoding="utf8")
1510 self.assertEqual(t.encoding, "utf8")
1511 t = self.TextIOWrapper(b)
1512 self.assert_(t.encoding is not None)
1513 codecs.lookup(t.encoding)
1514
1515 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001516 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001517 b = self.BytesIO(b"abc\n\xff\n")
1518 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001519 self.assertRaises(UnicodeError, t.read)
1520 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521 b = self.BytesIO(b"abc\n\xff\n")
1522 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001523 self.assertRaises(UnicodeError, t.read)
1524 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 b = self.BytesIO(b"abc\n\xff\n")
1526 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001527 self.assertEquals(t.read(), "abc\n\n")
1528 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529 b = self.BytesIO(b"abc\n\xff\n")
1530 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001531 self.assertEquals(t.read(), "abc\n\ufffd\n")
1532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001534 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535 b = self.BytesIO()
1536 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001537 self.assertRaises(UnicodeError, t.write, "\xff")
1538 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001539 b = self.BytesIO()
1540 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001541 self.assertRaises(UnicodeError, t.write, "\xff")
1542 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543 b = self.BytesIO()
1544 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001545 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001546 t.write("abc\xffdef\n")
1547 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001548 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001549 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 b = self.BytesIO()
1551 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001552 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001553 t.write("abc\xffdef\n")
1554 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001555 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001557 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001558 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1559
1560 tests = [
1561 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001562 [ '', input_lines ],
1563 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1564 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1565 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001566 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001567 encodings = (
1568 'utf-8', 'latin-1',
1569 'utf-16', 'utf-16-le', 'utf-16-be',
1570 'utf-32', 'utf-32-le', 'utf-32-be',
1571 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001572
Guido van Rossum8358db22007-08-18 21:39:55 +00001573 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001574 # character in TextIOWrapper._pending_line.
1575 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001576 # XXX: str.encode() should return bytes
1577 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001578 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001579 for bufsize in range(1, 10):
1580 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001581 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1582 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001583 encoding=encoding)
1584 if do_reads:
1585 got_lines = []
1586 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001587 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001588 if c2 == '':
1589 break
1590 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001591 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001592 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001593 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001594
1595 for got_line, exp_line in zip(got_lines, exp_lines):
1596 self.assertEquals(got_line, exp_line)
1597 self.assertEquals(len(got_lines), len(exp_lines))
1598
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 def test_newlines_input(self):
1600 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001601 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1602 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001603 (None, normalized.decode("ascii").splitlines(True)),
1604 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1606 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1607 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001608 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 buf = self.BytesIO(testdata)
1610 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001611 self.assertEquals(txt.readlines(), expected)
1612 txt.seek(0)
1613 self.assertEquals(txt.read(), "".join(expected))
1614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001615 def test_newlines_output(self):
1616 testdict = {
1617 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1618 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1619 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1620 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1621 }
1622 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1623 for newline, expected in tests:
1624 buf = self.BytesIO()
1625 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1626 txt.write("AAA\nB")
1627 txt.write("BB\nCCC\n")
1628 txt.write("X\rY\r\nZ")
1629 txt.flush()
1630 self.assertEquals(buf.closed, False)
1631 self.assertEquals(buf.getvalue(), expected)
1632
1633 def test_destructor(self):
1634 l = []
1635 base = self.BytesIO
1636 class MyBytesIO(base):
1637 def close(self):
1638 l.append(self.getvalue())
1639 base.close(self)
1640 b = MyBytesIO()
1641 t = self.TextIOWrapper(b, encoding="ascii")
1642 t.write("abc")
1643 del t
1644 self.assertEquals([b"abc"], l)
1645
1646 def test_override_destructor(self):
1647 record = []
1648 class MyTextIO(self.TextIOWrapper):
1649 def __del__(self):
1650 record.append(1)
1651 try:
1652 f = super().__del__
1653 except AttributeError:
1654 pass
1655 else:
1656 f()
1657 def close(self):
1658 record.append(2)
1659 super().close()
1660 def flush(self):
1661 record.append(3)
1662 super().flush()
1663 b = self.BytesIO()
1664 t = MyTextIO(b, encoding="ascii")
1665 del t
1666 self.assertEqual(record, [1, 2, 3])
1667
1668 def test_error_through_destructor(self):
1669 # Test that the exception state is not modified by a destructor,
1670 # even if close() fails.
1671 rawio = self.CloseFailureIO()
1672 def f():
1673 self.TextIOWrapper(rawio).xyzzy
1674 with support.captured_output("stderr") as s:
1675 self.assertRaises(AttributeError, f)
1676 s = s.getvalue().strip()
1677 if s:
1678 # The destructor *may* have printed an unraisable error, check it
1679 self.assertEqual(len(s.splitlines()), 1)
1680 self.assert_(s.startswith("Exception IOError: "), s)
1681 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001682
Guido van Rossum9b76da62007-04-11 01:09:03 +00001683 # Systematic tests of the text I/O API
1684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001686 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1687 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001689 f._CHUNK_SIZE = chunksize
1690 self.assertEquals(f.write("abc"), 3)
1691 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001693 f._CHUNK_SIZE = chunksize
1694 self.assertEquals(f.tell(), 0)
1695 self.assertEquals(f.read(), "abc")
1696 cookie = f.tell()
1697 self.assertEquals(f.seek(0), 0)
1698 self.assertEquals(f.read(2), "ab")
1699 self.assertEquals(f.read(1), "c")
1700 self.assertEquals(f.read(1), "")
1701 self.assertEquals(f.read(), "")
1702 self.assertEquals(f.tell(), cookie)
1703 self.assertEquals(f.seek(0), 0)
1704 self.assertEquals(f.seek(0, 2), cookie)
1705 self.assertEquals(f.write("def"), 3)
1706 self.assertEquals(f.seek(cookie), cookie)
1707 self.assertEquals(f.read(), "def")
1708 if enc.startswith("utf"):
1709 self.multi_line_test(f, enc)
1710 f.close()
1711
1712 def multi_line_test(self, f, enc):
1713 f.seek(0)
1714 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001715 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001716 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001717 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001718 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001719 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001720 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001721 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001722 wlines.append((f.tell(), line))
1723 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001724 f.seek(0)
1725 rlines = []
1726 while True:
1727 pos = f.tell()
1728 line = f.readline()
1729 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001730 break
1731 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001732 self.assertEquals(rlines, wlines)
1733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001734 def test_telling(self):
1735 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001736 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001737 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001738 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001739 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001740 p2 = f.tell()
1741 f.seek(0)
1742 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001743 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001744 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001745 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001746 self.assertEquals(f.tell(), p2)
1747 f.seek(0)
1748 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001749 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001750 self.assertRaises(IOError, f.tell)
1751 self.assertEquals(f.tell(), p2)
1752 f.close()
1753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 def test_seeking(self):
1755 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001756 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001757 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001758 prefix = bytes(u_prefix.encode("utf-8"))
1759 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001760 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001761 suffix = bytes(u_suffix.encode("utf-8"))
1762 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001764 f.write(line*2)
1765 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001767 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001768 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001769 self.assertEquals(f.tell(), prefix_size)
1770 self.assertEquals(f.readline(), u_suffix)
1771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001773 # Regression test for a specific bug
1774 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001776 f.write(data)
1777 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001778 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001779 f._CHUNK_SIZE # Just test that it exists
1780 f._CHUNK_SIZE = 2
1781 f.readline()
1782 f.tell()
1783
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 def test_seek_and_tell(self):
1785 #Test seek/tell using the StatefulIncrementalDecoder.
1786 # Make test faster by doing smaller seeks
1787 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001788
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001789 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001790 """Tell/seek to various points within a data stream and ensure
1791 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001792 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001793 f.write(data)
1794 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 f = self.open(support.TESTFN, encoding='test_decoder')
1796 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001797 decoded = f.read()
1798 f.close()
1799
Neal Norwitze2b07052008-03-18 19:52:05 +00001800 for i in range(min_pos, len(decoded) + 1): # seek positions
1801 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001803 self.assertEquals(f.read(i), decoded[:i])
1804 cookie = f.tell()
1805 self.assertEquals(f.read(j), decoded[i:i + j])
1806 f.seek(cookie)
1807 self.assertEquals(f.read(), decoded[i:])
1808 f.close()
1809
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001810 # Enable the test decoder.
1811 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001812
1813 # Run the tests.
1814 try:
1815 # Try each test case.
1816 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001817 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001818
1819 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001820 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1821 offset = CHUNK_SIZE - len(input)//2
1822 prefix = b'.'*offset
1823 # Don't bother seeking into the prefix (takes too long).
1824 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001825 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001826
1827 # Ensure our test decoder won't interfere with subsequent tests.
1828 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001829 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001830
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001832 data = "1234567890"
1833 tests = ("utf-16",
1834 "utf-16-le",
1835 "utf-16-be",
1836 "utf-32",
1837 "utf-32-le",
1838 "utf-32-be")
1839 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001840 buf = self.BytesIO()
1841 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001842 # Check if the BOM is written only once (see issue1753).
1843 f.write(data)
1844 f.write(data)
1845 f.seek(0)
1846 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001847 f.seek(0)
1848 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001849 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1850
Benjamin Petersona1b49012009-03-31 23:11:32 +00001851 def test_unreadable(self):
1852 class UnReadable(self.BytesIO):
1853 def readable(self):
1854 return False
1855 txt = self.TextIOWrapper(UnReadable())
1856 self.assertRaises(IOError, txt.read)
1857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 def test_read_one_by_one(self):
1859 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001860 reads = ""
1861 while True:
1862 c = txt.read(1)
1863 if not c:
1864 break
1865 reads += c
1866 self.assertEquals(reads, "AA\nBB")
1867
1868 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001870 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001872 reads = ""
1873 while True:
1874 c = txt.read(128)
1875 if not c:
1876 break
1877 reads += c
1878 self.assertEquals(reads, "A"*127+"\nB")
1879
1880 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001881 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001882
1883 # read one char at a time
1884 reads = ""
1885 while True:
1886 c = txt.read(1)
1887 if not c:
1888 break
1889 reads += c
1890 self.assertEquals(reads, self.normalized)
1891
1892 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001894 txt._CHUNK_SIZE = 4
1895
1896 reads = ""
1897 while True:
1898 c = txt.read(4)
1899 if not c:
1900 break
1901 reads += c
1902 self.assertEquals(reads, self.normalized)
1903
1904 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001905 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001906 txt._CHUNK_SIZE = 4
1907
1908 reads = txt.read(4)
1909 reads += txt.read(4)
1910 reads += txt.readline()
1911 reads += txt.readline()
1912 reads += txt.readline()
1913 self.assertEquals(reads, self.normalized)
1914
1915 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001917 txt._CHUNK_SIZE = 4
1918
1919 reads = txt.read(4)
1920 reads += txt.read()
1921 self.assertEquals(reads, self.normalized)
1922
1923 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001925 txt._CHUNK_SIZE = 4
1926
1927 reads = txt.read(4)
1928 pos = txt.tell()
1929 txt.seek(0)
1930 txt.seek(pos)
1931 self.assertEquals(txt.read(4), "BBB\n")
1932
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001933 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 buffer = self.BytesIO(self.testdata)
1935 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001936
1937 self.assertEqual(buffer.seekable(), txt.seekable())
1938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001939class CTextIOWrapperTest(TextIOWrapperTest):
1940
1941 def test_initialization(self):
1942 r = self.BytesIO(b"\xc3\xa9\n\n")
1943 b = self.BufferedReader(r, 1000)
1944 t = self.TextIOWrapper(b)
1945 self.assertRaises(TypeError, t.__init__, b, newline=42)
1946 self.assertRaises(ValueError, t.read)
1947 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1948 self.assertRaises(ValueError, t.read)
1949
1950 def test_garbage_collection(self):
1951 # C TextIOWrapper objects are collected, and collecting them flushes
1952 # all data to disk.
1953 # The Python version has __del__, so it ends in gc.garbage instead.
1954 rawio = io.FileIO(support.TESTFN, "wb")
1955 b = self.BufferedWriter(rawio)
1956 t = self.TextIOWrapper(b, encoding="ascii")
1957 t.write("456def")
1958 t.x = t
1959 wr = weakref.ref(t)
1960 del t
1961 gc.collect()
1962 self.assert_(wr() is None, wr)
1963 with open(support.TESTFN, "rb") as f:
1964 self.assertEqual(f.read(), b"456def")
1965
1966class PyTextIOWrapperTest(TextIOWrapperTest):
1967 pass
1968
1969
1970class IncrementalNewlineDecoderTest(unittest.TestCase):
1971
1972 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001973 # UTF-8 specific tests for a newline decoder
1974 def _check_decode(b, s, **kwargs):
1975 # We exercise getstate() / setstate() as well as decode()
1976 state = decoder.getstate()
1977 self.assertEquals(decoder.decode(b, **kwargs), s)
1978 decoder.setstate(state)
1979 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001980
Antoine Pitrou180a3362008-12-14 16:36:46 +00001981 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001982
Antoine Pitrou180a3362008-12-14 16:36:46 +00001983 _check_decode(b'\xe8', "")
1984 _check_decode(b'\xa2', "")
1985 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001986
Antoine Pitrou180a3362008-12-14 16:36:46 +00001987 _check_decode(b'\xe8', "")
1988 _check_decode(b'\xa2', "")
1989 _check_decode(b'\x88', "\u8888")
1990
1991 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001992 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1993
Antoine Pitrou180a3362008-12-14 16:36:46 +00001994 decoder.reset()
1995 _check_decode(b'\n', "\n")
1996 _check_decode(b'\r', "")
1997 _check_decode(b'', "\n", final=True)
1998 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001999
Antoine Pitrou180a3362008-12-14 16:36:46 +00002000 _check_decode(b'\r', "")
2001 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002002
Antoine Pitrou180a3362008-12-14 16:36:46 +00002003 _check_decode(b'\r\r\n', "\n\n")
2004 _check_decode(b'\r', "")
2005 _check_decode(b'\r', "\n")
2006 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002007
Antoine Pitrou180a3362008-12-14 16:36:46 +00002008 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2009 _check_decode(b'\xe8\xa2\x88', "\u8888")
2010 _check_decode(b'\n', "\n")
2011 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2012 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002013
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002015 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 if encoding is not None:
2017 encoder = codecs.getincrementalencoder(encoding)()
2018 def _decode_bytewise(s):
2019 # Decode one byte at a time
2020 for b in encoder.encode(s):
2021 result.append(decoder.decode(bytes([b])))
2022 else:
2023 encoder = None
2024 def _decode_bytewise(s):
2025 # Decode one char at a time
2026 for c in s:
2027 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002028 self.assertEquals(decoder.newlines, None)
2029 _decode_bytewise("abc\n\r")
2030 self.assertEquals(decoder.newlines, '\n')
2031 _decode_bytewise("\nabc")
2032 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2033 _decode_bytewise("abc\r")
2034 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2035 _decode_bytewise("abc")
2036 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2037 _decode_bytewise("abc\r")
2038 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2039 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040 input = "abc"
2041 if encoder is not None:
2042 encoder.reset()
2043 input = encoder.encode(input)
2044 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002045 self.assertEquals(decoder.newlines, None)
2046
2047 def test_newline_decoder(self):
2048 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002049 # None meaning the IncrementalNewlineDecoder takes unicode input
2050 # rather than bytes input
2051 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002052 'utf-16', 'utf-16-le', 'utf-16-be',
2053 'utf-32', 'utf-32-le', 'utf-32-be',
2054 )
2055 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002056 decoder = enc and codecs.getincrementaldecoder(enc)()
2057 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2058 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002059 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002060 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2061 self.check_newline_decoding_utf8(decoder)
2062
Antoine Pitrou66913e22009-03-06 23:40:56 +00002063 def test_newline_bytes(self):
2064 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2065 def _check(dec):
2066 self.assertEquals(dec.newlines, None)
2067 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2068 self.assertEquals(dec.newlines, None)
2069 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2070 self.assertEquals(dec.newlines, None)
2071 dec = self.IncrementalNewlineDecoder(None, translate=False)
2072 _check(dec)
2073 dec = self.IncrementalNewlineDecoder(None, translate=True)
2074 _check(dec)
2075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2077 pass
2078
2079class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2080 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002081
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002082
Guido van Rossum01a27522007-03-07 01:00:12 +00002083# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002084
Guido van Rossum5abbf752007-08-27 17:39:33 +00002085class MiscIOTest(unittest.TestCase):
2086
Barry Warsaw40e82462008-11-20 20:14:50 +00002087 def tearDown(self):
2088 support.unlink(support.TESTFN)
2089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 def test___all__(self):
2091 for name in self.io.__all__:
2092 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002093 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002094 if name == "open":
2095 continue
2096 elif "error" in name.lower():
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002097 self.assertTrue(issubclass(obj, Exception), name)
2098 elif not name.startswith("SEEK_"):
2099 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002100
Barry Warsaw40e82462008-11-20 20:14:50 +00002101 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002103 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002104 f.close()
2105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002106 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002107 self.assertEquals(f.name, support.TESTFN)
2108 self.assertEquals(f.buffer.name, support.TESTFN)
2109 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2110 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002111 self.assertEquals(f.buffer.mode, "rb")
2112 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002113 f.close()
2114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002115 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002116 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002117 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2118 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002121 self.assertEquals(g.mode, "wb")
2122 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002123 self.assertEquals(g.name, f.fileno())
2124 self.assertEquals(g.raw.name, f.fileno())
2125 f.close()
2126 g.close()
2127
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002128 def test_io_after_close(self):
2129 for kwargs in [
2130 {"mode": "w"},
2131 {"mode": "wb"},
2132 {"mode": "w", "buffering": 1},
2133 {"mode": "w", "buffering": 2},
2134 {"mode": "wb", "buffering": 0},
2135 {"mode": "r"},
2136 {"mode": "rb"},
2137 {"mode": "r", "buffering": 1},
2138 {"mode": "r", "buffering": 2},
2139 {"mode": "rb", "buffering": 0},
2140 {"mode": "w+"},
2141 {"mode": "w+b"},
2142 {"mode": "w+", "buffering": 1},
2143 {"mode": "w+", "buffering": 2},
2144 {"mode": "w+b", "buffering": 0},
2145 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002147 f.close()
2148 self.assertRaises(ValueError, f.flush)
2149 self.assertRaises(ValueError, f.fileno)
2150 self.assertRaises(ValueError, f.isatty)
2151 self.assertRaises(ValueError, f.__iter__)
2152 if hasattr(f, "peek"):
2153 self.assertRaises(ValueError, f.peek, 1)
2154 self.assertRaises(ValueError, f.read)
2155 if hasattr(f, "read1"):
2156 self.assertRaises(ValueError, f.read1, 1024)
2157 if hasattr(f, "readinto"):
2158 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2159 self.assertRaises(ValueError, f.readline)
2160 self.assertRaises(ValueError, f.readlines)
2161 self.assertRaises(ValueError, f.seek, 0)
2162 self.assertRaises(ValueError, f.tell)
2163 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 self.assertRaises(ValueError, f.write,
2165 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002166 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002169 def test_blockingioerror(self):
2170 # Various BlockingIOError issues
2171 self.assertRaises(TypeError, self.BlockingIOError)
2172 self.assertRaises(TypeError, self.BlockingIOError, 1)
2173 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2174 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2175 b = self.BlockingIOError(1, "")
2176 self.assertEqual(b.characters_written, 0)
2177 class C(str):
2178 pass
2179 c = C("")
2180 b = self.BlockingIOError(1, c)
2181 c.b = b
2182 b.c = c
2183 wr = weakref.ref(c)
2184 del c, b
2185 gc.collect()
2186 self.assert_(wr() is None, wr)
2187
2188 def test_abcs(self):
2189 # Test the visible base classes are ABCs.
2190 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2191 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2192 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2193 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2194
2195 def _check_abc_inheritance(self, abcmodule):
2196 with self.open(support.TESTFN, "wb", buffering=0) as f:
2197 self.assertTrue(isinstance(f, abcmodule.IOBase))
2198 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2199 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2200 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2201 with self.open(support.TESTFN, "wb") as f:
2202 self.assertTrue(isinstance(f, abcmodule.IOBase))
2203 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2204 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2205 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2206 with self.open(support.TESTFN, "w") as f:
2207 self.assertTrue(isinstance(f, abcmodule.IOBase))
2208 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2209 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2210 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2211
2212 def test_abc_inheritance(self):
2213 # Test implementations inherit from their respective ABCs
2214 self._check_abc_inheritance(self)
2215
2216 def test_abc_inheritance_official(self):
2217 # Test implementations inherit from the official ABCs of the
2218 # baseline "io" module.
2219 self._check_abc_inheritance(io)
2220
2221class CMiscIOTest(MiscIOTest):
2222 io = io
2223
2224class PyMiscIOTest(MiscIOTest):
2225 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002226
Guido van Rossum28524c72007-02-27 05:47:44 +00002227def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 tests = (CIOTest, PyIOTest,
2229 CBufferedReaderTest, PyBufferedReaderTest,
2230 CBufferedWriterTest, PyBufferedWriterTest,
2231 CBufferedRWPairTest, PyBufferedRWPairTest,
2232 CBufferedRandomTest, PyBufferedRandomTest,
2233 StatefulIncrementalDecoderTest,
2234 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2235 CTextIOWrapperTest, PyTextIOWrapperTest,
2236 CMiscIOTest, PyMiscIOTest,)
2237
2238 # Put the namespaces of the IO module we are testing and some useful mock
2239 # classes in the __dict__ of each test.
2240 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2241 MockNonBlockWriterIO)
2242 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2243 c_io_ns = {name : getattr(io, name) for name in all_members}
2244 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2245 globs = globals()
2246 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2247 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2248 # Avoid turning open into a bound method.
2249 py_io_ns["open"] = pyio.OpenWrapper
2250 for test in tests:
2251 if test.__name__.startswith("C"):
2252 for name, obj in c_io_ns.items():
2253 setattr(test, name, obj)
2254 elif test.__name__.startswith("Py"):
2255 for name, obj in py_io_ns.items():
2256 setattr(test, name, obj)
2257
2258 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002259
2260if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 test_main()