blob: fc109f3ff6b46bb784ecb867a112566424bab9a3 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Georg Brandl1b37e872010-03-14 10:45:50 +000032from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000033from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000034from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000035
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000036import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000037import io # C implementation of io
38import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000039try:
40 import threading
41except ImportError:
42 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000043
Guido van Rossuma9e20242007-03-08 00:43:48 +000044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000045def _default_chunk_size():
46 """Get the default TextIOWrapper chunk size"""
47 with open(__file__, "r", encoding="latin1") as f:
48 return f._CHUNK_SIZE
49
50
51class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000053 def __init__(self, read_stack=()):
54 self._read_stack = list(read_stack)
55 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000057 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000058
59 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063 except:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads += 1
Guido van Rossum78892e42007-04-06 17:31:18 +000065 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
113class CMockRawIO(MockRawIO, io.RawIOBase):
114 pass
115
116class PyMockRawIO(MockRawIO, pyio.RawIOBase):
117 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000118
Guido van Rossuma9e20242007-03-08 00:43:48 +0000119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120class MisbehavedRawIO(MockRawIO):
121 def write(self, b):
122 return super().write(b) * 2
123
124 def read(self, n=None):
125 return super().read(n) * 2
126
127 def seek(self, pos, whence):
128 return -123
129
130 def tell(self):
131 return -456
132
133 def readinto(self, buf):
134 super().readinto(buf)
135 return len(buf) * 5
136
137class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
138 pass
139
140class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
141 pass
142
143
144class CloseFailureIO(MockRawIO):
145 closed = 0
146
147 def close(self):
148 if not self.closed:
149 self.closed = 1
150 raise IOError
151
152class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
153 pass
154
155class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
156 pass
157
158
159class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000160
161 def __init__(self, data):
162 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000164
165 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000166 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000167 self.read_history.append(None if res is None else len(res))
168 return res
169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170 def readinto(self, b):
171 res = super().readinto(b)
172 self.read_history.append(res)
173 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000175class CMockFileIO(MockFileIO, io.BytesIO):
176 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000178class PyMockFileIO(MockFileIO, pyio.BytesIO):
179 pass
180
181
182class MockNonBlockWriterIO:
183
184 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000185 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def pop_written(self):
189 s = b"".join(self._write_stack)
190 self._write_stack[:] = []
191 return s
192
193 def block_on(self, char):
194 """Block when a given char is encountered."""
195 self._blocker_char = char
196
197 def readable(self):
198 return True
199
200 def seekable(self):
201 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000202
Guido van Rossum01a27522007-03-07 01:00:12 +0000203 def writable(self):
204 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000206 def write(self, b):
207 b = bytes(b)
208 n = -1
209 if self._blocker_char:
210 try:
211 n = b.index(self._blocker_char)
212 except ValueError:
213 pass
214 else:
215 self._blocker_char = None
216 self._write_stack.append(b[:n])
217 raise self.BlockingIOError(0, "test blocking", n)
218 self._write_stack.append(b)
219 return len(b)
220
221class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
222 BlockingIOError = io.BlockingIOError
223
224class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
225 BlockingIOError = pyio.BlockingIOError
226
Guido van Rossuma9e20242007-03-08 00:43:48 +0000227
Guido van Rossum28524c72007-02-27 05:47:44 +0000228class IOTest(unittest.TestCase):
229
Neal Norwitze7789b12008-03-24 06:18:09 +0000230 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000231 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000232
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000233 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000234 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000235
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000237 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000238 f.truncate(0)
239 self.assertEqual(f.tell(), 5)
240 f.seek(0)
241
242 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000243 self.assertEqual(f.seek(0), 0)
244 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000246 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000247 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000248 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000249 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000250 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000251 self.assertEqual(f.seek(-1, 2), 13)
252 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000253
Guido van Rossum87429772007-04-10 21:06:59 +0000254 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000255 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000256 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000257
Guido van Rossum9b76da62007-04-11 01:09:03 +0000258 def read_ops(self, f, buffered=False):
259 data = f.read(5)
260 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000261 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000262 self.assertEqual(f.readinto(data), 5)
263 self.assertEqual(data, b" worl")
264 self.assertEqual(f.readinto(data), 2)
265 self.assertEqual(len(data), 5)
266 self.assertEqual(data[:2], b"d\n")
267 self.assertEqual(f.seek(0), 0)
268 self.assertEqual(f.read(20), b"hello world\n")
269 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000270 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000271 self.assertEqual(f.seek(-6, 2), 6)
272 self.assertEqual(f.read(5), b"world")
273 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000274 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000275 self.assertEqual(f.seek(-6, 1), 5)
276 self.assertEqual(f.read(5), b" worl")
277 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000278 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000279 if buffered:
280 f.seek(0)
281 self.assertEqual(f.read(), b"hello world\n")
282 f.seek(6)
283 self.assertEqual(f.read(), b"world\n")
284 self.assertEqual(f.read(), b"")
285
Guido van Rossum34d69e52007-04-10 20:08:41 +0000286 LARGE = 2**31
287
Guido van Rossum53807da2007-04-10 19:01:47 +0000288 def large_file_ops(self, f):
289 assert f.readable()
290 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000291 self.assertEqual(f.seek(self.LARGE), self.LARGE)
292 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000293 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000294 self.assertEqual(f.tell(), self.LARGE + 3)
295 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000296 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000297 self.assertEqual(f.tell(), self.LARGE + 2)
298 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000299 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000300 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000301 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
302 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000303 self.assertEqual(f.read(2), b"x")
304
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000305 def test_invalid_operations(self):
306 # Try writing on a file opened in read mode and vice-versa.
307 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000308 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000309 self.assertRaises(IOError, fp.read)
310 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000311 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000312 self.assertRaises(IOError, fp.write, b"blah")
313 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000314 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000315 self.assertRaises(IOError, fp.write, "blah")
316 self.assertRaises(IOError, fp.writelines, ["blah\n"])
317
Guido van Rossum28524c72007-02-27 05:47:44 +0000318 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000319 with self.open(support.TESTFN, "wb", buffering=0) as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb", buffering=0) as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000329
Guido van Rossum87429772007-04-10 21:06:59 +0000330 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000331 with self.open(support.TESTFN, "wb") as f:
332 self.assertEqual(f.readable(), False)
333 self.assertEqual(f.writable(), True)
334 self.assertEqual(f.seekable(), True)
335 self.write_ops(f)
336 with self.open(support.TESTFN, "rb") as f:
337 self.assertEqual(f.readable(), True)
338 self.assertEqual(f.writable(), False)
339 self.assertEqual(f.seekable(), True)
340 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000341
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000342 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000343 with self.open(support.TESTFN, "wb") as f:
344 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
345 with self.open(support.TESTFN, "rb") as f:
346 self.assertEqual(f.readline(), b"abc\n")
347 self.assertEqual(f.readline(10), b"def\n")
348 self.assertEqual(f.readline(2), b"xy")
349 self.assertEqual(f.readline(4), b"zzy\n")
350 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000351 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000352 self.assertRaises(TypeError, f.readline, 5.3)
353 with self.open(support.TESTFN, "r") as f:
354 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000355
Guido van Rossum28524c72007-02-27 05:47:44 +0000356 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000357 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 self.write_ops(f)
359 data = f.getvalue()
360 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000361 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000362 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000363
Guido van Rossum53807da2007-04-10 19:01:47 +0000364 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000365 # On Windows and Mac OSX this test comsumes large resources; It takes
366 # a long time to build the >2GB file and takes >2GB of disk space
367 # therefore the resource must be enabled to run this test.
368 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000369 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000370 print("\nTesting large file ops skipped on %s." % sys.platform,
371 file=sys.stderr)
372 print("It requires %d bytes and a long time." % self.LARGE,
373 file=sys.stderr)
374 print("Use 'regrtest.py -u largefile test_io' to run it.",
375 file=sys.stderr)
376 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000377 with self.open(support.TESTFN, "w+b", 0) as f:
378 self.large_file_ops(f)
379 with self.open(support.TESTFN, "w+b") as f:
380 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
382 def test_with_open(self):
383 for bufsize in (0, 1, 100):
384 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000385 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000386 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000387 self.assertEqual(f.closed, True)
388 f = None
389 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000390 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000391 1/0
392 except ZeroDivisionError:
393 self.assertEqual(f.closed, True)
394 else:
395 self.fail("1/0 didn't raise an exception")
396
Antoine Pitrou08838b62009-01-21 00:55:13 +0000397 # issue 5008
398 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000400 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000402 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000403 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000404 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000406 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000407
Guido van Rossum87429772007-04-10 21:06:59 +0000408 def test_destructor(self):
409 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000410 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000411 def __del__(self):
412 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 try:
414 f = super().__del__
415 except AttributeError:
416 pass
417 else:
418 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000419 def close(self):
420 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000421 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000422 def flush(self):
423 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000424 super().flush()
425 f = MyFileIO(support.TESTFN, "wb")
426 f.write(b"xxx")
427 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000428 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000429 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000431 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432
433 def _check_base_destructor(self, base):
434 record = []
435 class MyIO(base):
436 def __init__(self):
437 # This exercises the availability of attributes on object
438 # destruction.
439 # (in the C version, close() is called by the tp_dealloc
440 # function, not by __del__)
441 self.on_del = 1
442 self.on_close = 2
443 self.on_flush = 3
444 def __del__(self):
445 record.append(self.on_del)
446 try:
447 f = super().__del__
448 except AttributeError:
449 pass
450 else:
451 f()
452 def close(self):
453 record.append(self.on_close)
454 super().close()
455 def flush(self):
456 record.append(self.on_flush)
457 super().flush()
458 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000460 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000461 self.assertEqual(record, [1, 2, 3])
462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 def test_IOBase_destructor(self):
464 self._check_base_destructor(self.IOBase)
465
466 def test_RawIOBase_destructor(self):
467 self._check_base_destructor(self.RawIOBase)
468
469 def test_BufferedIOBase_destructor(self):
470 self._check_base_destructor(self.BufferedIOBase)
471
472 def test_TextIOBase_destructor(self):
473 self._check_base_destructor(self.TextIOBase)
474
Guido van Rossum87429772007-04-10 21:06:59 +0000475 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000476 with self.open(support.TESTFN, "wb") as f:
477 f.write(b"xxx")
478 with self.open(support.TESTFN, "rb") as f:
479 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000480
Guido van Rossumd4103952007-04-12 05:44:49 +0000481 def test_array_writes(self):
482 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000483 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000484 with self.open(support.TESTFN, "wb", 0) as f:
485 self.assertEqual(f.write(a), n)
486 with self.open(support.TESTFN, "wb") as f:
487 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000488
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000489 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000491 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 def test_read_closed(self):
494 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000495 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000496 with self.open(support.TESTFN, "r") as f:
497 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000498 self.assertEqual(file.read(), "egg\n")
499 file.seek(0)
500 file.close()
501 self.assertRaises(ValueError, file.read)
502
503 def test_no_closefd_with_filename(self):
504 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000505 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000506
507 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000508 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000509 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000511 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000512 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000513 self.assertEqual(file.buffer.raw.closefd, False)
514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 def test_garbage_collection(self):
516 # FileIO objects are collected, and collecting them flushes
517 # all data to disk.
518 f = self.FileIO(support.TESTFN, "wb")
519 f.write(b"abcxxx")
520 f.f = f
521 wr = weakref.ref(f)
522 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000523 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000524 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000525 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000526 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000527
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 def test_unbounded_file(self):
529 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
530 zero = "/dev/zero"
531 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000532 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000533 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000534 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000535 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000536 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000537 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000539 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000540 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000541 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000542 self.assertRaises(OverflowError, f.read)
543
Antoine Pitrou6be88762010-05-03 16:48:20 +0000544 def test_flush_error_on_close(self):
545 f = self.open(support.TESTFN, "wb", buffering=0)
546 def bad_flush():
547 raise IOError()
548 f.flush = bad_flush
549 self.assertRaises(IOError, f.close) # exception not swallowed
550
551 def test_multi_close(self):
552 f = self.open(support.TESTFN, "wb", buffering=0)
553 f.close()
554 f.close()
555 f.close()
556 self.assertRaises(ValueError, f.flush)
557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000558class CIOTest(IOTest):
559 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000560
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561class PyIOTest(IOTest):
562 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000563
Guido van Rossuma9e20242007-03-08 00:43:48 +0000564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565class CommonBufferedTests:
566 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
567
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000568 def test_detach(self):
569 raw = self.MockRawIO()
570 buf = self.tp(raw)
571 self.assertIs(buf.detach(), raw)
572 self.assertRaises(ValueError, buf.detach)
573
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 def test_fileno(self):
575 rawio = self.MockRawIO()
576 bufio = self.tp(rawio)
577
578 self.assertEquals(42, bufio.fileno())
579
580 def test_no_fileno(self):
581 # XXX will we always have fileno() function? If so, kill
582 # this test. Else, write it.
583 pass
584
585 def test_invalid_args(self):
586 rawio = self.MockRawIO()
587 bufio = self.tp(rawio)
588 # Invalid whence
589 self.assertRaises(ValueError, bufio.seek, 0, -1)
590 self.assertRaises(ValueError, bufio.seek, 0, 3)
591
592 def test_override_destructor(self):
593 tp = self.tp
594 record = []
595 class MyBufferedIO(tp):
596 def __del__(self):
597 record.append(1)
598 try:
599 f = super().__del__
600 except AttributeError:
601 pass
602 else:
603 f()
604 def close(self):
605 record.append(2)
606 super().close()
607 def flush(self):
608 record.append(3)
609 super().flush()
610 rawio = self.MockRawIO()
611 bufio = MyBufferedIO(rawio)
612 writable = bufio.writable()
613 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000614 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000615 if writable:
616 self.assertEqual(record, [1, 2, 3])
617 else:
618 self.assertEqual(record, [1, 2])
619
620 def test_context_manager(self):
621 # Test usability as a context manager
622 rawio = self.MockRawIO()
623 bufio = self.tp(rawio)
624 def _with():
625 with bufio:
626 pass
627 _with()
628 # bufio should now be closed, and using it a second time should raise
629 # a ValueError.
630 self.assertRaises(ValueError, _with)
631
632 def test_error_through_destructor(self):
633 # Test that the exception state is not modified by a destructor,
634 # even if close() fails.
635 rawio = self.CloseFailureIO()
636 def f():
637 self.tp(rawio).xyzzy
638 with support.captured_output("stderr") as s:
639 self.assertRaises(AttributeError, f)
640 s = s.getvalue().strip()
641 if s:
642 # The destructor *may* have printed an unraisable error, check it
643 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000644 self.assertTrue(s.startswith("Exception IOError: "), s)
645 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000646
Antoine Pitrou716c4442009-05-23 19:04:03 +0000647 def test_repr(self):
648 raw = self.MockRawIO()
649 b = self.tp(raw)
650 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
651 self.assertEqual(repr(b), "<%s>" % clsname)
652 raw.name = "dummy"
653 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
654 raw.name = b"dummy"
655 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
656
Antoine Pitrou6be88762010-05-03 16:48:20 +0000657 def test_flush_error_on_close(self):
658 raw = self.MockRawIO()
659 def bad_flush():
660 raise IOError()
661 raw.flush = bad_flush
662 b = self.tp(raw)
663 self.assertRaises(IOError, b.close) # exception not swallowed
664
665 def test_multi_close(self):
666 raw = self.MockRawIO()
667 b = self.tp(raw)
668 b.close()
669 b.close()
670 b.close()
671 self.assertRaises(ValueError, b.flush)
672
Guido van Rossum78892e42007-04-06 17:31:18 +0000673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
675 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 def test_constructor(self):
678 rawio = self.MockRawIO([b"abc"])
679 bufio = self.tp(rawio)
680 bufio.__init__(rawio)
681 bufio.__init__(rawio, buffer_size=1024)
682 bufio.__init__(rawio, buffer_size=16)
683 self.assertEquals(b"abc", bufio.read())
684 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
685 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
686 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
687 rawio = self.MockRawIO([b"abc"])
688 bufio.__init__(rawio)
689 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000692 for arg in (None, 7):
693 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
694 bufio = self.tp(rawio)
695 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 # Invalid args
697 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000698
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 def test_read1(self):
700 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
701 bufio = self.tp(rawio)
702 self.assertEquals(b"a", bufio.read(1))
703 self.assertEquals(b"b", bufio.read1(1))
704 self.assertEquals(rawio._reads, 1)
705 self.assertEquals(b"c", bufio.read1(100))
706 self.assertEquals(rawio._reads, 1)
707 self.assertEquals(b"d", bufio.read1(100))
708 self.assertEquals(rawio._reads, 2)
709 self.assertEquals(b"efg", bufio.read1(100))
710 self.assertEquals(rawio._reads, 3)
711 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000712 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000713 # Invalid args
714 self.assertRaises(ValueError, bufio.read1, -1)
715
716 def test_readinto(self):
717 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
718 bufio = self.tp(rawio)
719 b = bytearray(2)
720 self.assertEquals(bufio.readinto(b), 2)
721 self.assertEquals(b, b"ab")
722 self.assertEquals(bufio.readinto(b), 2)
723 self.assertEquals(b, b"cd")
724 self.assertEquals(bufio.readinto(b), 2)
725 self.assertEquals(b, b"ef")
726 self.assertEquals(bufio.readinto(b), 1)
727 self.assertEquals(b, b"gf")
728 self.assertEquals(bufio.readinto(b), 0)
729 self.assertEquals(b, b"gf")
730
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000731 def test_readlines(self):
732 def bufio():
733 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
734 return self.tp(rawio)
735 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
736 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
737 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000739 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000740 data = b"abcdefghi"
741 dlen = len(data)
742
743 tests = [
744 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
745 [ 100, [ 3, 3, 3], [ dlen ] ],
746 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
747 ]
748
749 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000750 rawio = self.MockFileIO(data)
751 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000752 pos = 0
753 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000754 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000755 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000757 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000760 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000761 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
762 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000763
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000764 self.assertEquals(b"abcd", bufio.read(6))
765 self.assertEquals(b"e", bufio.read(1))
766 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000767 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000768 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000769 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000771 def test_read_past_eof(self):
772 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
773 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000774
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000775 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000777 def test_read_all(self):
778 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
779 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000780
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000781 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000782
Victor Stinner45df8202010-04-28 22:31:17 +0000783 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000784 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000785 try:
786 # Write out many bytes with exactly the same number of 0's,
787 # 1's... 255's. This will help us check that concurrent reading
788 # doesn't duplicate or forget contents.
789 N = 1000
790 l = list(range(256)) * N
791 random.shuffle(l)
792 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000793 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000794 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000795 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000796 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000797 errors = []
798 results = []
799 def f():
800 try:
801 # Intra-buffer read then buffer-flushing read
802 for n in cycle([1, 19]):
803 s = bufio.read(n)
804 if not s:
805 break
806 # list.append() is atomic
807 results.append(s)
808 except Exception as e:
809 errors.append(e)
810 raise
811 threads = [threading.Thread(target=f) for x in range(20)]
812 for t in threads:
813 t.start()
814 time.sleep(0.02) # yield
815 for t in threads:
816 t.join()
817 self.assertFalse(errors,
818 "the following exceptions were caught: %r" % errors)
819 s = b''.join(results)
820 for i in range(256):
821 c = bytes(bytearray([i]))
822 self.assertEqual(s.count(c), N)
823 finally:
824 support.unlink(support.TESTFN)
825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 def test_misbehaved_io(self):
827 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
828 bufio = self.tp(rawio)
829 self.assertRaises(IOError, bufio.seek, 0)
830 self.assertRaises(IOError, bufio.tell)
831
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000832 def test_no_extraneous_read(self):
833 # Issue #9550; when the raw IO object has satisfied the read request,
834 # we should not issue any additional reads, otherwise it may block
835 # (e.g. socket).
836 bufsize = 16
837 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
838 rawio = self.MockRawIO([b"x" * n])
839 bufio = self.tp(rawio, bufsize)
840 self.assertEqual(bufio.read(n), b"x" * n)
841 # Simple case: one raw read is enough to satisfy the request.
842 self.assertEqual(rawio._extraneous_reads, 0,
843 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
844 # A more complex case where two raw reads are needed to satisfy
845 # the request.
846 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
847 bufio = self.tp(rawio, bufsize)
848 self.assertEqual(bufio.read(n), b"x" * n)
849 self.assertEqual(rawio._extraneous_reads, 0,
850 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
851
852
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853class CBufferedReaderTest(BufferedReaderTest):
854 tp = io.BufferedReader
855
856 def test_constructor(self):
857 BufferedReaderTest.test_constructor(self)
858 # The allocation can succeed on 32-bit builds, e.g. with more
859 # than 2GB RAM and a 64-bit kernel.
860 if sys.maxsize > 0x7FFFFFFF:
861 rawio = self.MockRawIO()
862 bufio = self.tp(rawio)
863 self.assertRaises((OverflowError, MemoryError, ValueError),
864 bufio.__init__, rawio, sys.maxsize)
865
866 def test_initialization(self):
867 rawio = self.MockRawIO([b"abc"])
868 bufio = self.tp(rawio)
869 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
870 self.assertRaises(ValueError, bufio.read)
871 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
872 self.assertRaises(ValueError, bufio.read)
873 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
874 self.assertRaises(ValueError, bufio.read)
875
876 def test_misbehaved_io_read(self):
877 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
878 bufio = self.tp(rawio)
879 # _pyio.BufferedReader seems to implement reading different, so that
880 # checking this is not so easy.
881 self.assertRaises(IOError, bufio.read, 10)
882
883 def test_garbage_collection(self):
884 # C BufferedReader objects are collected.
885 # The Python version has __del__, so it ends into gc.garbage instead
886 rawio = self.FileIO(support.TESTFN, "w+b")
887 f = self.tp(rawio)
888 f.f = f
889 wr = weakref.ref(f)
890 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000891 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000892 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893
894class PyBufferedReaderTest(BufferedReaderTest):
895 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000896
Guido van Rossuma9e20242007-03-08 00:43:48 +0000897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
899 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 def test_constructor(self):
902 rawio = self.MockRawIO()
903 bufio = self.tp(rawio)
904 bufio.__init__(rawio)
905 bufio.__init__(rawio, buffer_size=1024)
906 bufio.__init__(rawio, buffer_size=16)
907 self.assertEquals(3, bufio.write(b"abc"))
908 bufio.flush()
909 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
910 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
911 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
912 bufio.__init__(rawio)
913 self.assertEquals(3, bufio.write(b"ghi"))
914 bufio.flush()
915 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
916
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000917 def test_detach_flush(self):
918 raw = self.MockRawIO()
919 buf = self.tp(raw)
920 buf.write(b"howdy!")
921 self.assertFalse(raw._write_stack)
922 buf.detach()
923 self.assertEqual(raw._write_stack, [b"howdy!"])
924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000926 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 writer = self.MockRawIO()
928 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000929 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000930 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932 def test_write_overflow(self):
933 writer = self.MockRawIO()
934 bufio = self.tp(writer, 8)
935 contents = b"abcdefghijklmnop"
936 for n in range(0, len(contents), 3):
937 bufio.write(contents[n:n+3])
938 flushed = b"".join(writer._write_stack)
939 # At least (total - 8) bytes were implicitly flushed, perhaps more
940 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000941 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def check_writes(self, intermediate_func):
944 # Lots of writes, test the flushed output is as expected.
945 contents = bytes(range(256)) * 1000
946 n = 0
947 writer = self.MockRawIO()
948 bufio = self.tp(writer, 13)
949 # Generator of write sizes: repeat each N 15 times then proceed to N+1
950 def gen_sizes():
951 for size in count(1):
952 for i in range(15):
953 yield size
954 sizes = gen_sizes()
955 while n < len(contents):
956 size = min(next(sizes), len(contents) - n)
957 self.assertEquals(bufio.write(contents[n:n+size]), size)
958 intermediate_func(bufio)
959 n += size
960 bufio.flush()
961 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000962
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000963 def test_writes(self):
964 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000966 def test_writes_and_flushes(self):
967 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 def test_writes_and_seeks(self):
970 def _seekabs(bufio):
971 pos = bufio.tell()
972 bufio.seek(pos + 1, 0)
973 bufio.seek(pos - 1, 0)
974 bufio.seek(pos, 0)
975 self.check_writes(_seekabs)
976 def _seekrel(bufio):
977 pos = bufio.seek(0, 1)
978 bufio.seek(+1, 1)
979 bufio.seek(-1, 1)
980 bufio.seek(pos, 0)
981 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000982
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983 def test_writes_and_truncates(self):
984 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 def test_write_non_blocking(self):
987 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000988 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000990 self.assertEquals(bufio.write(b"abcd"), 4)
991 self.assertEquals(bufio.write(b"efghi"), 5)
992 # 1 byte will be written, the rest will be buffered
993 raw.block_on(b"k")
994 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 # 8 bytes will be written, 8 will be buffered and the rest will be lost
997 raw.block_on(b"0")
998 try:
999 bufio.write(b"opqrwxyz0123456789")
1000 except self.BlockingIOError as e:
1001 written = e.characters_written
1002 else:
1003 self.fail("BlockingIOError should have been raised")
1004 self.assertEquals(written, 16)
1005 self.assertEquals(raw.pop_written(),
1006 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1009 s = raw.pop_written()
1010 # Previously buffered bytes were flushed
1011 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 def test_write_and_rewind(self):
1014 raw = io.BytesIO()
1015 bufio = self.tp(raw, 4)
1016 self.assertEqual(bufio.write(b"abcdef"), 6)
1017 self.assertEqual(bufio.tell(), 6)
1018 bufio.seek(0, 0)
1019 self.assertEqual(bufio.write(b"XY"), 2)
1020 bufio.seek(6, 0)
1021 self.assertEqual(raw.getvalue(), b"XYcdef")
1022 self.assertEqual(bufio.write(b"123456"), 6)
1023 bufio.flush()
1024 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_flush(self):
1027 writer = self.MockRawIO()
1028 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001029 bufio.write(b"abc")
1030 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001031 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033 def test_destructor(self):
1034 writer = self.MockRawIO()
1035 bufio = self.tp(writer, 8)
1036 bufio.write(b"abc")
1037 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001038 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 self.assertEquals(b"abc", writer._write_stack[0])
1040
1041 def test_truncate(self):
1042 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001043 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044 bufio = self.tp(raw, 8)
1045 bufio.write(b"abcdef")
1046 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001047 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001048 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 self.assertEqual(f.read(), b"abc")
1050
Victor Stinner45df8202010-04-28 22:31:17 +00001051 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001053 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 # Write out many bytes from many threads and test they were
1055 # all flushed.
1056 N = 1000
1057 contents = bytes(range(256)) * N
1058 sizes = cycle([1, 19])
1059 n = 0
1060 queue = deque()
1061 while n < len(contents):
1062 size = next(sizes)
1063 queue.append(contents[n:n+size])
1064 n += size
1065 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001066 # We use a real file object because it allows us to
1067 # exercise situations where the GIL is released before
1068 # writing the buffer to the raw streams. This is in addition
1069 # to concurrency issues due to switching threads in the middle
1070 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001071 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001073 errors = []
1074 def f():
1075 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 while True:
1077 try:
1078 s = queue.popleft()
1079 except IndexError:
1080 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001081 bufio.write(s)
1082 except Exception as e:
1083 errors.append(e)
1084 raise
1085 threads = [threading.Thread(target=f) for x in range(20)]
1086 for t in threads:
1087 t.start()
1088 time.sleep(0.02) # yield
1089 for t in threads:
1090 t.join()
1091 self.assertFalse(errors,
1092 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001094 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 s = f.read()
1096 for i in range(256):
1097 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001098 finally:
1099 support.unlink(support.TESTFN)
1100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 def test_misbehaved_io(self):
1102 rawio = self.MisbehavedRawIO()
1103 bufio = self.tp(rawio, 5)
1104 self.assertRaises(IOError, bufio.seek, 0)
1105 self.assertRaises(IOError, bufio.tell)
1106 self.assertRaises(IOError, bufio.write, b"abcdef")
1107
Benjamin Peterson59406a92009-03-26 17:10:29 +00001108 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001109 with support.check_warnings(("max_buffer_size is deprecated",
1110 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001111 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001112
1113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114class CBufferedWriterTest(BufferedWriterTest):
1115 tp = io.BufferedWriter
1116
1117 def test_constructor(self):
1118 BufferedWriterTest.test_constructor(self)
1119 # The allocation can succeed on 32-bit builds, e.g. with more
1120 # than 2GB RAM and a 64-bit kernel.
1121 if sys.maxsize > 0x7FFFFFFF:
1122 rawio = self.MockRawIO()
1123 bufio = self.tp(rawio)
1124 self.assertRaises((OverflowError, MemoryError, ValueError),
1125 bufio.__init__, rawio, sys.maxsize)
1126
1127 def test_initialization(self):
1128 rawio = self.MockRawIO()
1129 bufio = self.tp(rawio)
1130 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1131 self.assertRaises(ValueError, bufio.write, b"def")
1132 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1133 self.assertRaises(ValueError, bufio.write, b"def")
1134 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1135 self.assertRaises(ValueError, bufio.write, b"def")
1136
1137 def test_garbage_collection(self):
1138 # C BufferedWriter objects are collected, and collecting them flushes
1139 # all data to disk.
1140 # The Python version has __del__, so it ends into gc.garbage instead
1141 rawio = self.FileIO(support.TESTFN, "w+b")
1142 f = self.tp(rawio)
1143 f.write(b"123xxx")
1144 f.x = f
1145 wr = weakref.ref(f)
1146 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001147 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001148 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001149 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 self.assertEqual(f.read(), b"123xxx")
1151
1152
1153class PyBufferedWriterTest(BufferedWriterTest):
1154 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001155
Guido van Rossum01a27522007-03-07 01:00:12 +00001156class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001157
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001158 def test_constructor(self):
1159 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001160 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001161
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001162 def test_detach(self):
1163 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1164 self.assertRaises(self.UnsupportedOperation, pair.detach)
1165
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001166 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001167 with support.check_warnings(("max_buffer_size is deprecated",
1168 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001169 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001170
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001171 def test_constructor_with_not_readable(self):
1172 class NotReadable(MockRawIO):
1173 def readable(self):
1174 return False
1175
1176 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1177
1178 def test_constructor_with_not_writeable(self):
1179 class NotWriteable(MockRawIO):
1180 def writable(self):
1181 return False
1182
1183 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1184
1185 def test_read(self):
1186 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1187
1188 self.assertEqual(pair.read(3), b"abc")
1189 self.assertEqual(pair.read(1), b"d")
1190 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001191 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1192 self.assertEqual(pair.read(None), b"abc")
1193
1194 def test_readlines(self):
1195 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1196 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1197 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1198 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001199
1200 def test_read1(self):
1201 # .read1() is delegated to the underlying reader object, so this test
1202 # can be shallow.
1203 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1204
1205 self.assertEqual(pair.read1(3), b"abc")
1206
1207 def test_readinto(self):
1208 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1209
1210 data = bytearray(5)
1211 self.assertEqual(pair.readinto(data), 5)
1212 self.assertEqual(data, b"abcde")
1213
1214 def test_write(self):
1215 w = self.MockRawIO()
1216 pair = self.tp(self.MockRawIO(), w)
1217
1218 pair.write(b"abc")
1219 pair.flush()
1220 pair.write(b"def")
1221 pair.flush()
1222 self.assertEqual(w._write_stack, [b"abc", b"def"])
1223
1224 def test_peek(self):
1225 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1226
1227 self.assertTrue(pair.peek(3).startswith(b"abc"))
1228 self.assertEqual(pair.read(3), b"abc")
1229
1230 def test_readable(self):
1231 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1232 self.assertTrue(pair.readable())
1233
1234 def test_writeable(self):
1235 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1236 self.assertTrue(pair.writable())
1237
1238 def test_seekable(self):
1239 # BufferedRWPairs are never seekable, even if their readers and writers
1240 # are.
1241 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1242 self.assertFalse(pair.seekable())
1243
1244 # .flush() is delegated to the underlying writer object and has been
1245 # tested in the test_write method.
1246
1247 def test_close_and_closed(self):
1248 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1249 self.assertFalse(pair.closed)
1250 pair.close()
1251 self.assertTrue(pair.closed)
1252
1253 def test_isatty(self):
1254 class SelectableIsAtty(MockRawIO):
1255 def __init__(self, isatty):
1256 MockRawIO.__init__(self)
1257 self._isatty = isatty
1258
1259 def isatty(self):
1260 return self._isatty
1261
1262 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1263 self.assertFalse(pair.isatty())
1264
1265 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1266 self.assertTrue(pair.isatty())
1267
1268 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1269 self.assertTrue(pair.isatty())
1270
1271 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1272 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274class CBufferedRWPairTest(BufferedRWPairTest):
1275 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277class PyBufferedRWPairTest(BufferedRWPairTest):
1278 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001279
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001280
1281class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1282 read_mode = "rb+"
1283 write_mode = "wb+"
1284
1285 def test_constructor(self):
1286 BufferedReaderTest.test_constructor(self)
1287 BufferedWriterTest.test_constructor(self)
1288
1289 def test_read_and_write(self):
1290 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001291 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001292
1293 self.assertEqual(b"as", rw.read(2))
1294 rw.write(b"ddd")
1295 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001296 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001298 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 def test_seek_and_tell(self):
1301 raw = self.BytesIO(b"asdfghjkl")
1302 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001303
1304 self.assertEquals(b"as", rw.read(2))
1305 self.assertEquals(2, rw.tell())
1306 rw.seek(0, 0)
1307 self.assertEquals(b"asdf", rw.read(4))
1308
1309 rw.write(b"asdf")
1310 rw.seek(0, 0)
1311 self.assertEquals(b"asdfasdfl", rw.read())
1312 self.assertEquals(9, rw.tell())
1313 rw.seek(-4, 2)
1314 self.assertEquals(5, rw.tell())
1315 rw.seek(2, 1)
1316 self.assertEquals(7, rw.tell())
1317 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001318 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320 def check_flush_and_read(self, read_func):
1321 raw = self.BytesIO(b"abcdefghi")
1322 bufio = self.tp(raw)
1323
1324 self.assertEquals(b"ab", read_func(bufio, 2))
1325 bufio.write(b"12")
1326 self.assertEquals(b"ef", read_func(bufio, 2))
1327 self.assertEquals(6, bufio.tell())
1328 bufio.flush()
1329 self.assertEquals(6, bufio.tell())
1330 self.assertEquals(b"ghi", read_func(bufio))
1331 raw.seek(0, 0)
1332 raw.write(b"XYZ")
1333 # flush() resets the read buffer
1334 bufio.flush()
1335 bufio.seek(0, 0)
1336 self.assertEquals(b"XYZ", read_func(bufio, 3))
1337
1338 def test_flush_and_read(self):
1339 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1340
1341 def test_flush_and_readinto(self):
1342 def _readinto(bufio, n=-1):
1343 b = bytearray(n if n >= 0 else 9999)
1344 n = bufio.readinto(b)
1345 return bytes(b[:n])
1346 self.check_flush_and_read(_readinto)
1347
1348 def test_flush_and_peek(self):
1349 def _peek(bufio, n=-1):
1350 # This relies on the fact that the buffer can contain the whole
1351 # raw stream, otherwise peek() can return less.
1352 b = bufio.peek(n)
1353 if n != -1:
1354 b = b[:n]
1355 bufio.seek(len(b), 1)
1356 return b
1357 self.check_flush_and_read(_peek)
1358
1359 def test_flush_and_write(self):
1360 raw = self.BytesIO(b"abcdefghi")
1361 bufio = self.tp(raw)
1362
1363 bufio.write(b"123")
1364 bufio.flush()
1365 bufio.write(b"45")
1366 bufio.flush()
1367 bufio.seek(0, 0)
1368 self.assertEquals(b"12345fghi", raw.getvalue())
1369 self.assertEquals(b"12345fghi", bufio.read())
1370
1371 def test_threads(self):
1372 BufferedReaderTest.test_threads(self)
1373 BufferedWriterTest.test_threads(self)
1374
1375 def test_writes_and_peek(self):
1376 def _peek(bufio):
1377 bufio.peek(1)
1378 self.check_writes(_peek)
1379 def _peek(bufio):
1380 pos = bufio.tell()
1381 bufio.seek(-1, 1)
1382 bufio.peek(1)
1383 bufio.seek(pos, 0)
1384 self.check_writes(_peek)
1385
1386 def test_writes_and_reads(self):
1387 def _read(bufio):
1388 bufio.seek(-1, 1)
1389 bufio.read(1)
1390 self.check_writes(_read)
1391
1392 def test_writes_and_read1s(self):
1393 def _read1(bufio):
1394 bufio.seek(-1, 1)
1395 bufio.read1(1)
1396 self.check_writes(_read1)
1397
1398 def test_writes_and_readintos(self):
1399 def _read(bufio):
1400 bufio.seek(-1, 1)
1401 bufio.readinto(bytearray(1))
1402 self.check_writes(_read)
1403
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001404 def test_write_after_readahead(self):
1405 # Issue #6629: writing after the buffer was filled by readahead should
1406 # first rewind the raw stream.
1407 for overwrite_size in [1, 5]:
1408 raw = self.BytesIO(b"A" * 10)
1409 bufio = self.tp(raw, 4)
1410 # Trigger readahead
1411 self.assertEqual(bufio.read(1), b"A")
1412 self.assertEqual(bufio.tell(), 1)
1413 # Overwriting should rewind the raw stream if it needs so
1414 bufio.write(b"B" * overwrite_size)
1415 self.assertEqual(bufio.tell(), overwrite_size + 1)
1416 # If the write size was smaller than the buffer size, flush() and
1417 # check that rewind happens.
1418 bufio.flush()
1419 self.assertEqual(bufio.tell(), overwrite_size + 1)
1420 s = raw.getvalue()
1421 self.assertEqual(s,
1422 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1423
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001424 def test_truncate_after_read_or_write(self):
1425 raw = self.BytesIO(b"A" * 10)
1426 bufio = self.tp(raw, 100)
1427 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1428 self.assertEqual(bufio.truncate(), 2)
1429 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1430 self.assertEqual(bufio.truncate(), 4)
1431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432 def test_misbehaved_io(self):
1433 BufferedReaderTest.test_misbehaved_io(self)
1434 BufferedWriterTest.test_misbehaved_io(self)
1435
1436class CBufferedRandomTest(BufferedRandomTest):
1437 tp = io.BufferedRandom
1438
1439 def test_constructor(self):
1440 BufferedRandomTest.test_constructor(self)
1441 # The allocation can succeed on 32-bit builds, e.g. with more
1442 # than 2GB RAM and a 64-bit kernel.
1443 if sys.maxsize > 0x7FFFFFFF:
1444 rawio = self.MockRawIO()
1445 bufio = self.tp(rawio)
1446 self.assertRaises((OverflowError, MemoryError, ValueError),
1447 bufio.__init__, rawio, sys.maxsize)
1448
1449 def test_garbage_collection(self):
1450 CBufferedReaderTest.test_garbage_collection(self)
1451 CBufferedWriterTest.test_garbage_collection(self)
1452
1453class PyBufferedRandomTest(BufferedRandomTest):
1454 tp = pyio.BufferedRandom
1455
1456
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001457# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1458# properties:
1459# - A single output character can correspond to many bytes of input.
1460# - The number of input bytes to complete the character can be
1461# undetermined until the last input byte is received.
1462# - The number of input bytes can vary depending on previous input.
1463# - A single input byte can correspond to many characters of output.
1464# - The number of output characters can be undetermined until the
1465# last input byte is received.
1466# - The number of output characters can vary depending on previous input.
1467
1468class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1469 """
1470 For testing seek/tell behavior with a stateful, buffering decoder.
1471
1472 Input is a sequence of words. Words may be fixed-length (length set
1473 by input) or variable-length (period-terminated). In variable-length
1474 mode, extra periods are ignored. Possible words are:
1475 - 'i' followed by a number sets the input length, I (maximum 99).
1476 When I is set to 0, words are space-terminated.
1477 - 'o' followed by a number sets the output length, O (maximum 99).
1478 - Any other word is converted into a word followed by a period on
1479 the output. The output word consists of the input word truncated
1480 or padded out with hyphens to make its length equal to O. If O
1481 is 0, the word is output verbatim without truncating or padding.
1482 I and O are initially set to 1. When I changes, any buffered input is
1483 re-scanned according to the new I. EOF also terminates the last word.
1484 """
1485
1486 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001487 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001488 self.reset()
1489
1490 def __repr__(self):
1491 return '<SID %x>' % id(self)
1492
1493 def reset(self):
1494 self.i = 1
1495 self.o = 1
1496 self.buffer = bytearray()
1497
1498 def getstate(self):
1499 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1500 return bytes(self.buffer), i*100 + o
1501
1502 def setstate(self, state):
1503 buffer, io = state
1504 self.buffer = bytearray(buffer)
1505 i, o = divmod(io, 100)
1506 self.i, self.o = i ^ 1, o ^ 1
1507
1508 def decode(self, input, final=False):
1509 output = ''
1510 for b in input:
1511 if self.i == 0: # variable-length, terminated with period
1512 if b == ord('.'):
1513 if self.buffer:
1514 output += self.process_word()
1515 else:
1516 self.buffer.append(b)
1517 else: # fixed-length, terminate after self.i bytes
1518 self.buffer.append(b)
1519 if len(self.buffer) == self.i:
1520 output += self.process_word()
1521 if final and self.buffer: # EOF terminates the last word
1522 output += self.process_word()
1523 return output
1524
1525 def process_word(self):
1526 output = ''
1527 if self.buffer[0] == ord('i'):
1528 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1529 elif self.buffer[0] == ord('o'):
1530 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1531 else:
1532 output = self.buffer.decode('ascii')
1533 if len(output) < self.o:
1534 output += '-'*self.o # pad out with hyphens
1535 if self.o:
1536 output = output[:self.o] # truncate to output length
1537 output += '.'
1538 self.buffer = bytearray()
1539 return output
1540
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001541 codecEnabled = False
1542
1543 @classmethod
1544 def lookupTestDecoder(cls, name):
1545 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001546 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001547 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001548 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001549 incrementalencoder=None,
1550 streamreader=None, streamwriter=None,
1551 incrementaldecoder=cls)
1552
1553# Register the previous decoder for testing.
1554# Disabled by default, tests will enable it.
1555codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1556
1557
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001558class StatefulIncrementalDecoderTest(unittest.TestCase):
1559 """
1560 Make sure the StatefulIncrementalDecoder actually works.
1561 """
1562
1563 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001564 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001565 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001566 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001567 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001568 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001569 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001570 # I=0, O=6 (variable-length input, fixed-length output)
1571 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1572 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001573 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001574 # I=6, O=3 (fixed-length input > fixed-length output)
1575 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1576 # I=0, then 3; O=29, then 15 (with longer output)
1577 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1578 'a----------------------------.' +
1579 'b----------------------------.' +
1580 'cde--------------------------.' +
1581 'abcdefghijabcde.' +
1582 'a.b------------.' +
1583 '.c.------------.' +
1584 'd.e------------.' +
1585 'k--------------.' +
1586 'l--------------.' +
1587 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001588 ]
1589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001591 # Try a few one-shot test cases.
1592 for input, eof, output in self.test_cases:
1593 d = StatefulIncrementalDecoder()
1594 self.assertEquals(d.decode(input, eof), output)
1595
1596 # Also test an unfinished decode, followed by forcing EOF.
1597 d = StatefulIncrementalDecoder()
1598 self.assertEquals(d.decode(b'oiabcd'), '')
1599 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001600
1601class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001602
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001603 def setUp(self):
1604 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1605 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001606 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001607
Guido van Rossumd0712812007-04-11 16:32:43 +00001608 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001609 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611 def test_constructor(self):
1612 r = self.BytesIO(b"\xc3\xa9\n\n")
1613 b = self.BufferedReader(r, 1000)
1614 t = self.TextIOWrapper(b)
1615 t.__init__(b, encoding="latin1", newline="\r\n")
1616 self.assertEquals(t.encoding, "latin1")
1617 self.assertEquals(t.line_buffering, False)
1618 t.__init__(b, encoding="utf8", line_buffering=True)
1619 self.assertEquals(t.encoding, "utf8")
1620 self.assertEquals(t.line_buffering, True)
1621 self.assertEquals("\xe9\n", t.readline())
1622 self.assertRaises(TypeError, t.__init__, b, newline=42)
1623 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1624
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001625 def test_detach(self):
1626 r = self.BytesIO()
1627 b = self.BufferedWriter(r)
1628 t = self.TextIOWrapper(b)
1629 self.assertIs(t.detach(), b)
1630
1631 t = self.TextIOWrapper(b, encoding="ascii")
1632 t.write("howdy")
1633 self.assertFalse(r.getvalue())
1634 t.detach()
1635 self.assertEqual(r.getvalue(), b"howdy")
1636 self.assertRaises(ValueError, t.detach)
1637
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001638 def test_repr(self):
1639 raw = self.BytesIO("hello".encode("utf-8"))
1640 b = self.BufferedReader(raw)
1641 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001642 modname = self.TextIOWrapper.__module__
1643 self.assertEqual(repr(t),
1644 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1645 raw.name = "dummy"
1646 self.assertEqual(repr(t),
1647 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1648 raw.name = b"dummy"
1649 self.assertEqual(repr(t),
1650 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001651
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001652 def test_line_buffering(self):
1653 r = self.BytesIO()
1654 b = self.BufferedWriter(r, 1000)
1655 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001656 t.write("X")
1657 self.assertEquals(r.getvalue(), b"") # No flush happened
1658 t.write("Y\nZ")
1659 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1660 t.write("A\rB")
1661 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_encoding(self):
1664 # Check the encoding attribute is always set, and valid
1665 b = self.BytesIO()
1666 t = self.TextIOWrapper(b, encoding="utf8")
1667 self.assertEqual(t.encoding, "utf8")
1668 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001669 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 codecs.lookup(t.encoding)
1671
1672 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001673 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 b = self.BytesIO(b"abc\n\xff\n")
1675 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001676 self.assertRaises(UnicodeError, t.read)
1677 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 b = self.BytesIO(b"abc\n\xff\n")
1679 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001680 self.assertRaises(UnicodeError, t.read)
1681 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 b = self.BytesIO(b"abc\n\xff\n")
1683 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001684 self.assertEquals(t.read(), "abc\n\n")
1685 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 b = self.BytesIO(b"abc\n\xff\n")
1687 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001688 self.assertEquals(t.read(), "abc\n\ufffd\n")
1689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001690 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001691 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 b = self.BytesIO()
1693 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001694 self.assertRaises(UnicodeError, t.write, "\xff")
1695 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 b = self.BytesIO()
1697 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001698 self.assertRaises(UnicodeError, t.write, "\xff")
1699 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 b = self.BytesIO()
1701 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001702 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001703 t.write("abc\xffdef\n")
1704 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001705 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001706 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 b = self.BytesIO()
1708 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001709 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001710 t.write("abc\xffdef\n")
1711 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001712 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001715 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1716
1717 tests = [
1718 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001719 [ '', input_lines ],
1720 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1721 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1722 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001723 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001724 encodings = (
1725 'utf-8', 'latin-1',
1726 'utf-16', 'utf-16-le', 'utf-16-be',
1727 'utf-32', 'utf-32-le', 'utf-32-be',
1728 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001729
Guido van Rossum8358db22007-08-18 21:39:55 +00001730 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001731 # character in TextIOWrapper._pending_line.
1732 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001733 # XXX: str.encode() should return bytes
1734 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001735 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001736 for bufsize in range(1, 10):
1737 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1739 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001740 encoding=encoding)
1741 if do_reads:
1742 got_lines = []
1743 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001744 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001745 if c2 == '':
1746 break
1747 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001748 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001749 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001750 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001751
1752 for got_line, exp_line in zip(got_lines, exp_lines):
1753 self.assertEquals(got_line, exp_line)
1754 self.assertEquals(len(got_lines), len(exp_lines))
1755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001756 def test_newlines_input(self):
1757 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001758 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1759 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001760 (None, normalized.decode("ascii").splitlines(True)),
1761 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1763 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1764 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001765 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 buf = self.BytesIO(testdata)
1767 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001768 self.assertEquals(txt.readlines(), expected)
1769 txt.seek(0)
1770 self.assertEquals(txt.read(), "".join(expected))
1771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 def test_newlines_output(self):
1773 testdict = {
1774 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1775 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1776 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1777 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1778 }
1779 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1780 for newline, expected in tests:
1781 buf = self.BytesIO()
1782 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1783 txt.write("AAA\nB")
1784 txt.write("BB\nCCC\n")
1785 txt.write("X\rY\r\nZ")
1786 txt.flush()
1787 self.assertEquals(buf.closed, False)
1788 self.assertEquals(buf.getvalue(), expected)
1789
1790 def test_destructor(self):
1791 l = []
1792 base = self.BytesIO
1793 class MyBytesIO(base):
1794 def close(self):
1795 l.append(self.getvalue())
1796 base.close(self)
1797 b = MyBytesIO()
1798 t = self.TextIOWrapper(b, encoding="ascii")
1799 t.write("abc")
1800 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001801 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 self.assertEquals([b"abc"], l)
1803
1804 def test_override_destructor(self):
1805 record = []
1806 class MyTextIO(self.TextIOWrapper):
1807 def __del__(self):
1808 record.append(1)
1809 try:
1810 f = super().__del__
1811 except AttributeError:
1812 pass
1813 else:
1814 f()
1815 def close(self):
1816 record.append(2)
1817 super().close()
1818 def flush(self):
1819 record.append(3)
1820 super().flush()
1821 b = self.BytesIO()
1822 t = MyTextIO(b, encoding="ascii")
1823 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001824 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 self.assertEqual(record, [1, 2, 3])
1826
1827 def test_error_through_destructor(self):
1828 # Test that the exception state is not modified by a destructor,
1829 # even if close() fails.
1830 rawio = self.CloseFailureIO()
1831 def f():
1832 self.TextIOWrapper(rawio).xyzzy
1833 with support.captured_output("stderr") as s:
1834 self.assertRaises(AttributeError, f)
1835 s = s.getvalue().strip()
1836 if s:
1837 # The destructor *may* have printed an unraisable error, check it
1838 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001839 self.assertTrue(s.startswith("Exception IOError: "), s)
1840 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001841
Guido van Rossum9b76da62007-04-11 01:09:03 +00001842 # Systematic tests of the text I/O API
1843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001845 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1846 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001847 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001848 f._CHUNK_SIZE = chunksize
1849 self.assertEquals(f.write("abc"), 3)
1850 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001852 f._CHUNK_SIZE = chunksize
1853 self.assertEquals(f.tell(), 0)
1854 self.assertEquals(f.read(), "abc")
1855 cookie = f.tell()
1856 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001857 self.assertEquals(f.read(None), "abc")
1858 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001859 self.assertEquals(f.read(2), "ab")
1860 self.assertEquals(f.read(1), "c")
1861 self.assertEquals(f.read(1), "")
1862 self.assertEquals(f.read(), "")
1863 self.assertEquals(f.tell(), cookie)
1864 self.assertEquals(f.seek(0), 0)
1865 self.assertEquals(f.seek(0, 2), cookie)
1866 self.assertEquals(f.write("def"), 3)
1867 self.assertEquals(f.seek(cookie), cookie)
1868 self.assertEquals(f.read(), "def")
1869 if enc.startswith("utf"):
1870 self.multi_line_test(f, enc)
1871 f.close()
1872
1873 def multi_line_test(self, f, enc):
1874 f.seek(0)
1875 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001876 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001877 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001878 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001879 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001880 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001881 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001882 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001883 wlines.append((f.tell(), line))
1884 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001885 f.seek(0)
1886 rlines = []
1887 while True:
1888 pos = f.tell()
1889 line = f.readline()
1890 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001891 break
1892 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001893 self.assertEquals(rlines, wlines)
1894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 def test_telling(self):
1896 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001897 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001898 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001899 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001900 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001901 p2 = f.tell()
1902 f.seek(0)
1903 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001904 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001905 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001906 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001907 self.assertEquals(f.tell(), p2)
1908 f.seek(0)
1909 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001910 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001911 self.assertRaises(IOError, f.tell)
1912 self.assertEquals(f.tell(), p2)
1913 f.close()
1914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001915 def test_seeking(self):
1916 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001917 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001918 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001919 prefix = bytes(u_prefix.encode("utf-8"))
1920 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001921 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001922 suffix = bytes(u_suffix.encode("utf-8"))
1923 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001925 f.write(line*2)
1926 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001927 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001928 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001929 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001930 self.assertEquals(f.tell(), prefix_size)
1931 self.assertEquals(f.readline(), u_suffix)
1932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001934 # Regression test for a specific bug
1935 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001937 f.write(data)
1938 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001939 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001940 f._CHUNK_SIZE # Just test that it exists
1941 f._CHUNK_SIZE = 2
1942 f.readline()
1943 f.tell()
1944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 def test_seek_and_tell(self):
1946 #Test seek/tell using the StatefulIncrementalDecoder.
1947 # Make test faster by doing smaller seeks
1948 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001949
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001950 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001951 """Tell/seek to various points within a data stream and ensure
1952 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001953 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001954 f.write(data)
1955 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956 f = self.open(support.TESTFN, encoding='test_decoder')
1957 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001958 decoded = f.read()
1959 f.close()
1960
Neal Norwitze2b07052008-03-18 19:52:05 +00001961 for i in range(min_pos, len(decoded) + 1): # seek positions
1962 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001964 self.assertEquals(f.read(i), decoded[:i])
1965 cookie = f.tell()
1966 self.assertEquals(f.read(j), decoded[i:i + j])
1967 f.seek(cookie)
1968 self.assertEquals(f.read(), decoded[i:])
1969 f.close()
1970
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001971 # Enable the test decoder.
1972 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001973
1974 # Run the tests.
1975 try:
1976 # Try each test case.
1977 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001978 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001979
1980 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001981 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1982 offset = CHUNK_SIZE - len(input)//2
1983 prefix = b'.'*offset
1984 # Don't bother seeking into the prefix (takes too long).
1985 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001986 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001987
1988 # Ensure our test decoder won't interfere with subsequent tests.
1989 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001990 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001993 data = "1234567890"
1994 tests = ("utf-16",
1995 "utf-16-le",
1996 "utf-16-be",
1997 "utf-32",
1998 "utf-32-le",
1999 "utf-32-be")
2000 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 buf = self.BytesIO()
2002 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002003 # Check if the BOM is written only once (see issue1753).
2004 f.write(data)
2005 f.write(data)
2006 f.seek(0)
2007 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002008 f.seek(0)
2009 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002010 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2011
Benjamin Petersona1b49012009-03-31 23:11:32 +00002012 def test_unreadable(self):
2013 class UnReadable(self.BytesIO):
2014 def readable(self):
2015 return False
2016 txt = self.TextIOWrapper(UnReadable())
2017 self.assertRaises(IOError, txt.read)
2018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 def test_read_one_by_one(self):
2020 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002021 reads = ""
2022 while True:
2023 c = txt.read(1)
2024 if not c:
2025 break
2026 reads += c
2027 self.assertEquals(reads, "AA\nBB")
2028
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002029 def test_readlines(self):
2030 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2031 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2032 txt.seek(0)
2033 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2034 txt.seek(0)
2035 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2036
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002037 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002039 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002041 reads = ""
2042 while True:
2043 c = txt.read(128)
2044 if not c:
2045 break
2046 reads += c
2047 self.assertEquals(reads, "A"*127+"\nB")
2048
2049 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002051
2052 # read one char at a time
2053 reads = ""
2054 while True:
2055 c = txt.read(1)
2056 if not c:
2057 break
2058 reads += c
2059 self.assertEquals(reads, self.normalized)
2060
2061 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002062 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002063 txt._CHUNK_SIZE = 4
2064
2065 reads = ""
2066 while True:
2067 c = txt.read(4)
2068 if not c:
2069 break
2070 reads += c
2071 self.assertEquals(reads, self.normalized)
2072
2073 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002075 txt._CHUNK_SIZE = 4
2076
2077 reads = txt.read(4)
2078 reads += txt.read(4)
2079 reads += txt.readline()
2080 reads += txt.readline()
2081 reads += txt.readline()
2082 self.assertEquals(reads, self.normalized)
2083
2084 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002086 txt._CHUNK_SIZE = 4
2087
2088 reads = txt.read(4)
2089 reads += txt.read()
2090 self.assertEquals(reads, self.normalized)
2091
2092 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002093 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002094 txt._CHUNK_SIZE = 4
2095
2096 reads = txt.read(4)
2097 pos = txt.tell()
2098 txt.seek(0)
2099 txt.seek(pos)
2100 self.assertEquals(txt.read(4), "BBB\n")
2101
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002102 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 buffer = self.BytesIO(self.testdata)
2104 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002105
2106 self.assertEqual(buffer.seekable(), txt.seekable())
2107
Antoine Pitroue4501852009-05-14 18:55:55 +00002108 def test_append_bom(self):
2109 # The BOM is not written again when appending to a non-empty file
2110 filename = support.TESTFN
2111 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2112 with self.open(filename, 'w', encoding=charset) as f:
2113 f.write('aaa')
2114 pos = f.tell()
2115 with self.open(filename, 'rb') as f:
2116 self.assertEquals(f.read(), 'aaa'.encode(charset))
2117
2118 with self.open(filename, 'a', encoding=charset) as f:
2119 f.write('xxx')
2120 with self.open(filename, 'rb') as f:
2121 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2122
2123 def test_seek_bom(self):
2124 # Same test, but when seeking manually
2125 filename = support.TESTFN
2126 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2127 with self.open(filename, 'w', encoding=charset) as f:
2128 f.write('aaa')
2129 pos = f.tell()
2130 with self.open(filename, 'r+', encoding=charset) as f:
2131 f.seek(pos)
2132 f.write('zzz')
2133 f.seek(0)
2134 f.write('bbb')
2135 with self.open(filename, 'rb') as f:
2136 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2137
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002138 def test_errors_property(self):
2139 with self.open(support.TESTFN, "w") as f:
2140 self.assertEqual(f.errors, "strict")
2141 with self.open(support.TESTFN, "w", errors="replace") as f:
2142 self.assertEqual(f.errors, "replace")
2143
Victor Stinner45df8202010-04-28 22:31:17 +00002144 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002145 def test_threads_write(self):
2146 # Issue6750: concurrent writes could duplicate data
2147 event = threading.Event()
2148 with self.open(support.TESTFN, "w", buffering=1) as f:
2149 def run(n):
2150 text = "Thread%03d\n" % n
2151 event.wait()
2152 f.write(text)
2153 threads = [threading.Thread(target=lambda n=x: run(n))
2154 for x in range(20)]
2155 for t in threads:
2156 t.start()
2157 time.sleep(0.02)
2158 event.set()
2159 for t in threads:
2160 t.join()
2161 with self.open(support.TESTFN) as f:
2162 content = f.read()
2163 for n in range(20):
2164 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2165
Antoine Pitrou6be88762010-05-03 16:48:20 +00002166 def test_flush_error_on_close(self):
2167 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2168 def bad_flush():
2169 raise IOError()
2170 txt.flush = bad_flush
2171 self.assertRaises(IOError, txt.close) # exception not swallowed
2172
2173 def test_multi_close(self):
2174 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2175 txt.close()
2176 txt.close()
2177 txt.close()
2178 self.assertRaises(ValueError, txt.flush)
2179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180class CTextIOWrapperTest(TextIOWrapperTest):
2181
2182 def test_initialization(self):
2183 r = self.BytesIO(b"\xc3\xa9\n\n")
2184 b = self.BufferedReader(r, 1000)
2185 t = self.TextIOWrapper(b)
2186 self.assertRaises(TypeError, t.__init__, b, newline=42)
2187 self.assertRaises(ValueError, t.read)
2188 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2189 self.assertRaises(ValueError, t.read)
2190
2191 def test_garbage_collection(self):
2192 # C TextIOWrapper objects are collected, and collecting them flushes
2193 # all data to disk.
2194 # The Python version has __del__, so it ends in gc.garbage instead.
2195 rawio = io.FileIO(support.TESTFN, "wb")
2196 b = self.BufferedWriter(rawio)
2197 t = self.TextIOWrapper(b, encoding="ascii")
2198 t.write("456def")
2199 t.x = t
2200 wr = weakref.ref(t)
2201 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002202 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002203 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002204 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 self.assertEqual(f.read(), b"456def")
2206
2207class PyTextIOWrapperTest(TextIOWrapperTest):
2208 pass
2209
2210
2211class IncrementalNewlineDecoderTest(unittest.TestCase):
2212
2213 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002214 # UTF-8 specific tests for a newline decoder
2215 def _check_decode(b, s, **kwargs):
2216 # We exercise getstate() / setstate() as well as decode()
2217 state = decoder.getstate()
2218 self.assertEquals(decoder.decode(b, **kwargs), s)
2219 decoder.setstate(state)
2220 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002221
Antoine Pitrou180a3362008-12-14 16:36:46 +00002222 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002223
Antoine Pitrou180a3362008-12-14 16:36:46 +00002224 _check_decode(b'\xe8', "")
2225 _check_decode(b'\xa2', "")
2226 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002227
Antoine Pitrou180a3362008-12-14 16:36:46 +00002228 _check_decode(b'\xe8', "")
2229 _check_decode(b'\xa2', "")
2230 _check_decode(b'\x88', "\u8888")
2231
2232 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002233 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2234
Antoine Pitrou180a3362008-12-14 16:36:46 +00002235 decoder.reset()
2236 _check_decode(b'\n', "\n")
2237 _check_decode(b'\r', "")
2238 _check_decode(b'', "\n", final=True)
2239 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002240
Antoine Pitrou180a3362008-12-14 16:36:46 +00002241 _check_decode(b'\r', "")
2242 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002243
Antoine Pitrou180a3362008-12-14 16:36:46 +00002244 _check_decode(b'\r\r\n', "\n\n")
2245 _check_decode(b'\r', "")
2246 _check_decode(b'\r', "\n")
2247 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002248
Antoine Pitrou180a3362008-12-14 16:36:46 +00002249 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2250 _check_decode(b'\xe8\xa2\x88', "\u8888")
2251 _check_decode(b'\n', "\n")
2252 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2253 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002256 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 if encoding is not None:
2258 encoder = codecs.getincrementalencoder(encoding)()
2259 def _decode_bytewise(s):
2260 # Decode one byte at a time
2261 for b in encoder.encode(s):
2262 result.append(decoder.decode(bytes([b])))
2263 else:
2264 encoder = None
2265 def _decode_bytewise(s):
2266 # Decode one char at a time
2267 for c in s:
2268 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002269 self.assertEquals(decoder.newlines, None)
2270 _decode_bytewise("abc\n\r")
2271 self.assertEquals(decoder.newlines, '\n')
2272 _decode_bytewise("\nabc")
2273 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2274 _decode_bytewise("abc\r")
2275 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2276 _decode_bytewise("abc")
2277 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2278 _decode_bytewise("abc\r")
2279 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2280 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 input = "abc"
2282 if encoder is not None:
2283 encoder.reset()
2284 input = encoder.encode(input)
2285 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002286 self.assertEquals(decoder.newlines, None)
2287
2288 def test_newline_decoder(self):
2289 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 # None meaning the IncrementalNewlineDecoder takes unicode input
2291 # rather than bytes input
2292 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002293 'utf-16', 'utf-16-le', 'utf-16-be',
2294 'utf-32', 'utf-32-le', 'utf-32-be',
2295 )
2296 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 decoder = enc and codecs.getincrementaldecoder(enc)()
2298 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2299 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002300 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2302 self.check_newline_decoding_utf8(decoder)
2303
Antoine Pitrou66913e22009-03-06 23:40:56 +00002304 def test_newline_bytes(self):
2305 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2306 def _check(dec):
2307 self.assertEquals(dec.newlines, None)
2308 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2309 self.assertEquals(dec.newlines, None)
2310 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2311 self.assertEquals(dec.newlines, None)
2312 dec = self.IncrementalNewlineDecoder(None, translate=False)
2313 _check(dec)
2314 dec = self.IncrementalNewlineDecoder(None, translate=True)
2315 _check(dec)
2316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002317class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2318 pass
2319
2320class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2321 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002322
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002323
Guido van Rossum01a27522007-03-07 01:00:12 +00002324# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002325
Guido van Rossum5abbf752007-08-27 17:39:33 +00002326class MiscIOTest(unittest.TestCase):
2327
Barry Warsaw40e82462008-11-20 20:14:50 +00002328 def tearDown(self):
2329 support.unlink(support.TESTFN)
2330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 def test___all__(self):
2332 for name in self.io.__all__:
2333 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002334 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002335 if name == "open":
2336 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002337 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002338 self.assertTrue(issubclass(obj, Exception), name)
2339 elif not name.startswith("SEEK_"):
2340 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002341
Barry Warsaw40e82462008-11-20 20:14:50 +00002342 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002343 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002344 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002345 f.close()
2346
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002348 self.assertEquals(f.name, support.TESTFN)
2349 self.assertEquals(f.buffer.name, support.TESTFN)
2350 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2351 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002352 self.assertEquals(f.buffer.mode, "rb")
2353 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002354 f.close()
2355
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002357 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002358 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2359 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002360
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002362 self.assertEquals(g.mode, "wb")
2363 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002364 self.assertEquals(g.name, f.fileno())
2365 self.assertEquals(g.raw.name, f.fileno())
2366 f.close()
2367 g.close()
2368
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002369 def test_io_after_close(self):
2370 for kwargs in [
2371 {"mode": "w"},
2372 {"mode": "wb"},
2373 {"mode": "w", "buffering": 1},
2374 {"mode": "w", "buffering": 2},
2375 {"mode": "wb", "buffering": 0},
2376 {"mode": "r"},
2377 {"mode": "rb"},
2378 {"mode": "r", "buffering": 1},
2379 {"mode": "r", "buffering": 2},
2380 {"mode": "rb", "buffering": 0},
2381 {"mode": "w+"},
2382 {"mode": "w+b"},
2383 {"mode": "w+", "buffering": 1},
2384 {"mode": "w+", "buffering": 2},
2385 {"mode": "w+b", "buffering": 0},
2386 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002387 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002388 f.close()
2389 self.assertRaises(ValueError, f.flush)
2390 self.assertRaises(ValueError, f.fileno)
2391 self.assertRaises(ValueError, f.isatty)
2392 self.assertRaises(ValueError, f.__iter__)
2393 if hasattr(f, "peek"):
2394 self.assertRaises(ValueError, f.peek, 1)
2395 self.assertRaises(ValueError, f.read)
2396 if hasattr(f, "read1"):
2397 self.assertRaises(ValueError, f.read1, 1024)
2398 if hasattr(f, "readinto"):
2399 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2400 self.assertRaises(ValueError, f.readline)
2401 self.assertRaises(ValueError, f.readlines)
2402 self.assertRaises(ValueError, f.seek, 0)
2403 self.assertRaises(ValueError, f.tell)
2404 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002405 self.assertRaises(ValueError, f.write,
2406 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002407 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002409
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002410 def test_blockingioerror(self):
2411 # Various BlockingIOError issues
2412 self.assertRaises(TypeError, self.BlockingIOError)
2413 self.assertRaises(TypeError, self.BlockingIOError, 1)
2414 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2415 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2416 b = self.BlockingIOError(1, "")
2417 self.assertEqual(b.characters_written, 0)
2418 class C(str):
2419 pass
2420 c = C("")
2421 b = self.BlockingIOError(1, c)
2422 c.b = b
2423 b.c = c
2424 wr = weakref.ref(c)
2425 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002426 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002427 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428
2429 def test_abcs(self):
2430 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002431 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2432 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2433 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2434 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002435
2436 def _check_abc_inheritance(self, abcmodule):
2437 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002438 self.assertIsInstance(f, abcmodule.IOBase)
2439 self.assertIsInstance(f, abcmodule.RawIOBase)
2440 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2441 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002442 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002443 self.assertIsInstance(f, abcmodule.IOBase)
2444 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2445 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2446 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002447 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002448 self.assertIsInstance(f, abcmodule.IOBase)
2449 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2450 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2451 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002452
2453 def test_abc_inheritance(self):
2454 # Test implementations inherit from their respective ABCs
2455 self._check_abc_inheritance(self)
2456
2457 def test_abc_inheritance_official(self):
2458 # Test implementations inherit from the official ABCs of the
2459 # baseline "io" module.
2460 self._check_abc_inheritance(io)
2461
2462class CMiscIOTest(MiscIOTest):
2463 io = io
2464
2465class PyMiscIOTest(MiscIOTest):
2466 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002467
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002468
2469@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2470class SignalsTest(unittest.TestCase):
2471
2472 def setUp(self):
2473 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2474
2475 def tearDown(self):
2476 signal.signal(signal.SIGALRM, self.oldalrm)
2477
2478 def alarm_interrupt(self, sig, frame):
2479 1/0
2480
2481 @unittest.skipUnless(threading, 'Threading required for this test.')
2482 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2483 """Check that a partial write, when it gets interrupted, properly
2484 invokes the signal handler."""
2485 read_results = []
2486 def _read():
2487 s = os.read(r, 1)
2488 read_results.append(s)
2489 t = threading.Thread(target=_read)
2490 t.daemon = True
2491 r, w = os.pipe()
2492 try:
2493 wio = self.io.open(w, **fdopen_kwargs)
2494 t.start()
2495 signal.alarm(1)
2496 # Fill the pipe enough that the write will be blocking.
2497 # It will be interrupted by the timer armed above. Since the
2498 # other thread has read one byte, the low-level write will
2499 # return with a successful (partial) result rather than an EINTR.
2500 # The buffered IO layer must check for pending signal
2501 # handlers, which in this case will invoke alarm_interrupt().
2502 self.assertRaises(ZeroDivisionError,
2503 wio.write, item * (1024 * 1024))
2504 t.join()
2505 # We got one byte, get another one and check that it isn't a
2506 # repeat of the first one.
2507 read_results.append(os.read(r, 1))
2508 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2509 finally:
2510 os.close(w)
2511 os.close(r)
2512 # This is deliberate. If we didn't close the file descriptor
2513 # before closing wio, wio would try to flush its internal
2514 # buffer, and block again.
2515 try:
2516 wio.close()
2517 except IOError as e:
2518 if e.errno != errno.EBADF:
2519 raise
2520
2521 def test_interrupted_write_unbuffered(self):
2522 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2523
2524 def test_interrupted_write_buffered(self):
2525 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2526
2527 def test_interrupted_write_text(self):
2528 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2529
2530class CSignalsTest(SignalsTest):
2531 io = io
2532
2533class PySignalsTest(SignalsTest):
2534 io = pyio
2535
2536
Guido van Rossum28524c72007-02-27 05:47:44 +00002537def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538 tests = (CIOTest, PyIOTest,
2539 CBufferedReaderTest, PyBufferedReaderTest,
2540 CBufferedWriterTest, PyBufferedWriterTest,
2541 CBufferedRWPairTest, PyBufferedRWPairTest,
2542 CBufferedRandomTest, PyBufferedRandomTest,
2543 StatefulIncrementalDecoderTest,
2544 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2545 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002546 CMiscIOTest, PyMiscIOTest,
2547 CSignalsTest, PySignalsTest,
2548 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002549
2550 # Put the namespaces of the IO module we are testing and some useful mock
2551 # classes in the __dict__ of each test.
2552 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2553 MockNonBlockWriterIO)
2554 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2555 c_io_ns = {name : getattr(io, name) for name in all_members}
2556 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2557 globs = globals()
2558 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2559 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2560 # Avoid turning open into a bound method.
2561 py_io_ns["open"] = pyio.OpenWrapper
2562 for test in tests:
2563 if test.__name__.startswith("C"):
2564 for name, obj in c_io_ns.items():
2565 setattr(test, name, obj)
2566 elif test.__name__.startswith("Py"):
2567 for name, obj in py_io_ns.items():
2568 setattr(test, name, obj)
2569
2570 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002571
2572if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002573 test_main()