blob: 4c7ce311418db92575e616fd67d5e7c4c9c163d8 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
Antoine Pitrou16b11de2010-08-21 19:17:57 +000033import signal
34import errno
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from itertools import chain, cycle, count
36from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000042
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044def _default_chunk_size():
45 """Get the default TextIOWrapper chunk size"""
46 with open(__file__, "r", encoding="latin1") as f:
47 return f._CHUNK_SIZE
48
49
Antoine Pitroue5e75c62010-09-14 18:53:07 +000050class MockRawIOWithoutRead:
51 """A RawIO implementation without read(), so as to exercise the default
52 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000053
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000054 def __init__(self, read_stack=()):
55 self._read_stack = list(read_stack)
56 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000057 self._reads = 0
Antoine Pitrou00091ca2010-08-11 13:38:10 +000058 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059
Guido van Rossum01a27522007-03-07 01:00:12 +000060 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000062 return len(b)
63
64 def writable(self):
65 return True
66
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067 def fileno(self):
68 return 42
69
70 def readable(self):
71 return True
72
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000078
79 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # same comment as above
81
82 def readinto(self, buf):
83 self._reads += 1
84 max_len = len(buf)
85 try:
86 data = self._read_stack[0]
87 except IndexError:
Antoine Pitrou00091ca2010-08-11 13:38:10 +000088 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0
90 if data is None:
91 del self._read_stack[0]
92 return None
93 n = len(data)
94 if len(data) <= max_len:
95 del self._read_stack[0]
96 buf[:n] = data
97 return n
98 else:
99 buf[:] = data[:max_len]
100 self._read_stack[0] = data[max_len:]
101 return max_len
102
103 def truncate(self, pos=None):
104 return pos
105
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000106class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
107 pass
108
109class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
110 pass
111
112
113class MockRawIO(MockRawIOWithoutRead):
114
115 def read(self, n=None):
116 self._reads += 1
117 try:
118 return self._read_stack.pop(0)
119 except:
120 self._extraneous_reads += 1
121 return b""
122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123class CMockRawIO(MockRawIO, io.RawIOBase):
124 pass
125
126class PyMockRawIO(MockRawIO, pyio.RawIOBase):
127 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000128
Guido van Rossuma9e20242007-03-08 00:43:48 +0000129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class MisbehavedRawIO(MockRawIO):
131 def write(self, b):
132 return super().write(b) * 2
133
134 def read(self, n=None):
135 return super().read(n) * 2
136
137 def seek(self, pos, whence):
138 return -123
139
140 def tell(self):
141 return -456
142
143 def readinto(self, buf):
144 super().readinto(buf)
145 return len(buf) * 5
146
147class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
148 pass
149
150class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
151 pass
152
153
154class CloseFailureIO(MockRawIO):
155 closed = 0
156
157 def close(self):
158 if not self.closed:
159 self.closed = 1
160 raise IOError
161
162class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
163 pass
164
165class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
166 pass
167
168
169class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000170
171 def __init__(self, data):
172 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177 self.read_history.append(None if res is None else len(res))
178 return res
179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 def readinto(self, b):
181 res = super().readinto(b)
182 self.read_history.append(res)
183 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185class CMockFileIO(MockFileIO, io.BytesIO):
186 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class PyMockFileIO(MockFileIO, pyio.BytesIO):
189 pass
190
191
192class MockNonBlockWriterIO:
193
194 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000195 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000198 def pop_written(self):
199 s = b"".join(self._write_stack)
200 self._write_stack[:] = []
201 return s
202
203 def block_on(self, char):
204 """Block when a given char is encountered."""
205 self._blocker_char = char
206
207 def readable(self):
208 return True
209
210 def seekable(self):
211 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000212
Guido van Rossum01a27522007-03-07 01:00:12 +0000213 def writable(self):
214 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 def write(self, b):
217 b = bytes(b)
218 n = -1
219 if self._blocker_char:
220 try:
221 n = b.index(self._blocker_char)
222 except ValueError:
223 pass
224 else:
225 self._blocker_char = None
226 self._write_stack.append(b[:n])
227 raise self.BlockingIOError(0, "test blocking", n)
228 self._write_stack.append(b)
229 return len(b)
230
231class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
232 BlockingIOError = io.BlockingIOError
233
234class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
235 BlockingIOError = pyio.BlockingIOError
236
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum28524c72007-02-27 05:47:44 +0000238class IOTest(unittest.TestCase):
239
Neal Norwitze7789b12008-03-24 06:18:09 +0000240 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000241 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000242
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000243 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000244 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000245
Guido van Rossum28524c72007-02-27 05:47:44 +0000246 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000248 f.truncate(0)
249 self.assertEqual(f.tell(), 5)
250 f.seek(0)
251
252 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000253 self.assertEqual(f.seek(0), 0)
254 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000255 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000256 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000257 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000259 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000260 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000261 self.assertEqual(f.seek(-1, 2), 13)
262 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000263
Guido van Rossum87429772007-04-10 21:06:59 +0000264 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000265 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000267
Guido van Rossum9b76da62007-04-11 01:09:03 +0000268 def read_ops(self, f, buffered=False):
269 data = f.read(5)
270 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000271 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000272 self.assertEqual(f.readinto(data), 5)
273 self.assertEqual(data, b" worl")
274 self.assertEqual(f.readinto(data), 2)
275 self.assertEqual(len(data), 5)
276 self.assertEqual(data[:2], b"d\n")
277 self.assertEqual(f.seek(0), 0)
278 self.assertEqual(f.read(20), b"hello world\n")
279 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000280 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000281 self.assertEqual(f.seek(-6, 2), 6)
282 self.assertEqual(f.read(5), b"world")
283 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000285 self.assertEqual(f.seek(-6, 1), 5)
286 self.assertEqual(f.read(5), b" worl")
287 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000288 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000289 if buffered:
290 f.seek(0)
291 self.assertEqual(f.read(), b"hello world\n")
292 f.seek(6)
293 self.assertEqual(f.read(), b"world\n")
294 self.assertEqual(f.read(), b"")
295
Guido van Rossum34d69e52007-04-10 20:08:41 +0000296 LARGE = 2**31
297
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 def large_file_ops(self, f):
299 assert f.readable()
300 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000301 self.assertEqual(f.seek(self.LARGE), self.LARGE)
302 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000303 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000304 self.assertEqual(f.tell(), self.LARGE + 3)
305 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000306 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000307 self.assertEqual(f.tell(), self.LARGE + 2)
308 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000310 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000311 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
312 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000313 self.assertEqual(f.read(2), b"x")
314
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000315 def test_invalid_operations(self):
316 # Try writing on a file opened in read mode and vice-versa.
317 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000318 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000319 self.assertRaises(IOError, fp.read)
320 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000321 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000322 self.assertRaises(IOError, fp.write, b"blah")
323 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000324 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000325 self.assertRaises(IOError, fp.write, "blah")
326 self.assertRaises(IOError, fp.writelines, ["blah\n"])
327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000329 with self.open(support.TESTFN, "wb", buffering=0) as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb", buffering=0) as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339
Guido van Rossum87429772007-04-10 21:06:59 +0000340 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000341 with self.open(support.TESTFN, "wb") as f:
342 self.assertEqual(f.readable(), False)
343 self.assertEqual(f.writable(), True)
344 self.assertEqual(f.seekable(), True)
345 self.write_ops(f)
346 with self.open(support.TESTFN, "rb") as f:
347 self.assertEqual(f.readable(), True)
348 self.assertEqual(f.writable(), False)
349 self.assertEqual(f.seekable(), True)
350 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000351
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000352 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000353 with self.open(support.TESTFN, "wb") as f:
354 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
355 with self.open(support.TESTFN, "rb") as f:
356 self.assertEqual(f.readline(), b"abc\n")
357 self.assertEqual(f.readline(10), b"def\n")
358 self.assertEqual(f.readline(2), b"xy")
359 self.assertEqual(f.readline(4), b"zzy\n")
360 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000361 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000362 self.assertRaises(TypeError, f.readline, 5.3)
363 with self.open(support.TESTFN, "r") as f:
364 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000367 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000368 self.write_ops(f)
369 data = f.getvalue()
370 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000372 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000373
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000375 # On Windows and Mac OSX this test comsumes large resources; It takes
376 # a long time to build the >2GB file and takes >2GB of disk space
377 # therefore the resource must be enabled to run this test.
378 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000379 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000380 print("\nTesting large file ops skipped on %s." % sys.platform,
381 file=sys.stderr)
382 print("It requires %d bytes and a long time." % self.LARGE,
383 file=sys.stderr)
384 print("Use 'regrtest.py -u largefile test_io' to run it.",
385 file=sys.stderr)
386 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "w+b", 0) as f:
388 self.large_file_ops(f)
389 with self.open(support.TESTFN, "w+b") as f:
390 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000391
392 def test_with_open(self):
393 for bufsize in (0, 1, 100):
394 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000395 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000396 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.closed, True)
398 f = None
399 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000400 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000401 1/0
402 except ZeroDivisionError:
403 self.assertEqual(f.closed, True)
404 else:
405 self.fail("1/0 didn't raise an exception")
406
Antoine Pitrou08838b62009-01-21 00:55:13 +0000407 # issue 5008
408 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000410 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000412 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000414 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000416 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000417
Guido van Rossum87429772007-04-10 21:06:59 +0000418 def test_destructor(self):
419 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000421 def __del__(self):
422 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000429 def close(self):
430 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000432 def flush(self):
433 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000434 super().flush()
435 f = MyFileIO(support.TESTFN, "wb")
436 f.write(b"xxx")
437 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000438 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000441 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000442
443 def _check_base_destructor(self, base):
444 record = []
445 class MyIO(base):
446 def __init__(self):
447 # This exercises the availability of attributes on object
448 # destruction.
449 # (in the C version, close() is called by the tp_dealloc
450 # function, not by __del__)
451 self.on_del = 1
452 self.on_close = 2
453 self.on_flush = 3
454 def __del__(self):
455 record.append(self.on_del)
456 try:
457 f = super().__del__
458 except AttributeError:
459 pass
460 else:
461 f()
462 def close(self):
463 record.append(self.on_close)
464 super().close()
465 def flush(self):
466 record.append(self.on_flush)
467 super().flush()
468 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000470 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000471 self.assertEqual(record, [1, 2, 3])
472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_IOBase_destructor(self):
474 self._check_base_destructor(self.IOBase)
475
476 def test_RawIOBase_destructor(self):
477 self._check_base_destructor(self.RawIOBase)
478
479 def test_BufferedIOBase_destructor(self):
480 self._check_base_destructor(self.BufferedIOBase)
481
482 def test_TextIOBase_destructor(self):
483 self._check_base_destructor(self.TextIOBase)
484
Guido van Rossum87429772007-04-10 21:06:59 +0000485 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000486 with self.open(support.TESTFN, "wb") as f:
487 f.write(b"xxx")
488 with self.open(support.TESTFN, "rb") as f:
489 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000490
Guido van Rossumd4103952007-04-12 05:44:49 +0000491 def test_array_writes(self):
492 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000493 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000494 with self.open(support.TESTFN, "wb", 0) as f:
495 self.assertEqual(f.write(a), n)
496 with self.open(support.TESTFN, "wb") as f:
497 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000498
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000499 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000501 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_read_closed(self):
504 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 with self.open(support.TESTFN, "r") as f:
507 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000508 self.assertEqual(file.read(), "egg\n")
509 file.seek(0)
510 file.close()
511 self.assertRaises(ValueError, file.read)
512
513 def test_no_closefd_with_filename(self):
514 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000516
517 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000519 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000521 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000522 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000523 self.assertEqual(file.buffer.raw.closefd, False)
524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000525 def test_garbage_collection(self):
526 # FileIO objects are collected, and collecting them flushes
527 # all data to disk.
528 f = self.FileIO(support.TESTFN, "wb")
529 f.write(b"abcxxx")
530 f.f = f
531 wr = weakref.ref(f)
532 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000533 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000534 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000537
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 def test_unbounded_file(self):
539 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
540 zero = "/dev/zero"
541 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000542 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000543 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000544 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000545 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000546 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000547 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000548 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000549 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000550 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000551 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000552 self.assertRaises(OverflowError, f.read)
553
Antoine Pitroufaf90072010-05-03 16:58:19 +0000554 def test_flush_error_on_close(self):
555 f = self.open(support.TESTFN, "wb", buffering=0)
556 def bad_flush():
557 raise IOError()
558 f.flush = bad_flush
559 self.assertRaises(IOError, f.close) # exception not swallowed
560
561 def test_multi_close(self):
562 f = self.open(support.TESTFN, "wb", buffering=0)
563 f.close()
564 f.close()
565 f.close()
566 self.assertRaises(ValueError, f.flush)
567
Antoine Pitroue5e75c62010-09-14 18:53:07 +0000568 def test_RawIOBase_read(self):
569 # Exercise the default RawIOBase.read() implementation (which calls
570 # readinto() internally).
571 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
572 self.assertEqual(rawio.read(2), b"ab")
573 self.assertEqual(rawio.read(2), b"c")
574 self.assertEqual(rawio.read(2), b"d")
575 self.assertEqual(rawio.read(2), None)
576 self.assertEqual(rawio.read(2), b"ef")
577 self.assertEqual(rawio.read(2), b"g")
578 self.assertEqual(rawio.read(2), None)
579 self.assertEqual(rawio.read(2), b"")
580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000581class CIOTest(IOTest):
582 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000584class PyIOTest(IOTest):
585 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000586
Guido van Rossuma9e20242007-03-08 00:43:48 +0000587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000588class CommonBufferedTests:
589 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
590
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000591 def test_detach(self):
592 raw = self.MockRawIO()
593 buf = self.tp(raw)
594 self.assertIs(buf.detach(), raw)
595 self.assertRaises(ValueError, buf.detach)
596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000597 def test_fileno(self):
598 rawio = self.MockRawIO()
599 bufio = self.tp(rawio)
600
601 self.assertEquals(42, bufio.fileno())
602
603 def test_no_fileno(self):
604 # XXX will we always have fileno() function? If so, kill
605 # this test. Else, write it.
606 pass
607
608 def test_invalid_args(self):
609 rawio = self.MockRawIO()
610 bufio = self.tp(rawio)
611 # Invalid whence
612 self.assertRaises(ValueError, bufio.seek, 0, -1)
613 self.assertRaises(ValueError, bufio.seek, 0, 3)
614
615 def test_override_destructor(self):
616 tp = self.tp
617 record = []
618 class MyBufferedIO(tp):
619 def __del__(self):
620 record.append(1)
621 try:
622 f = super().__del__
623 except AttributeError:
624 pass
625 else:
626 f()
627 def close(self):
628 record.append(2)
629 super().close()
630 def flush(self):
631 record.append(3)
632 super().flush()
633 rawio = self.MockRawIO()
634 bufio = MyBufferedIO(rawio)
635 writable = bufio.writable()
636 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000637 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 if writable:
639 self.assertEqual(record, [1, 2, 3])
640 else:
641 self.assertEqual(record, [1, 2])
642
643 def test_context_manager(self):
644 # Test usability as a context manager
645 rawio = self.MockRawIO()
646 bufio = self.tp(rawio)
647 def _with():
648 with bufio:
649 pass
650 _with()
651 # bufio should now be closed, and using it a second time should raise
652 # a ValueError.
653 self.assertRaises(ValueError, _with)
654
655 def test_error_through_destructor(self):
656 # Test that the exception state is not modified by a destructor,
657 # even if close() fails.
658 rawio = self.CloseFailureIO()
659 def f():
660 self.tp(rawio).xyzzy
661 with support.captured_output("stderr") as s:
662 self.assertRaises(AttributeError, f)
663 s = s.getvalue().strip()
664 if s:
665 # The destructor *may* have printed an unraisable error, check it
666 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000667 self.assertTrue(s.startswith("Exception IOError: "), s)
668 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000669
Antoine Pitrou716c4442009-05-23 19:04:03 +0000670 def test_repr(self):
671 raw = self.MockRawIO()
672 b = self.tp(raw)
673 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
674 self.assertEqual(repr(b), "<%s>" % clsname)
675 raw.name = "dummy"
676 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
677 raw.name = b"dummy"
678 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
679
Antoine Pitroufaf90072010-05-03 16:58:19 +0000680 def test_flush_error_on_close(self):
681 raw = self.MockRawIO()
682 def bad_flush():
683 raise IOError()
684 raw.flush = bad_flush
685 b = self.tp(raw)
686 self.assertRaises(IOError, b.close) # exception not swallowed
687
688 def test_multi_close(self):
689 raw = self.MockRawIO()
690 b = self.tp(raw)
691 b.close()
692 b.close()
693 b.close()
694 self.assertRaises(ValueError, b.flush)
695
Guido van Rossum78892e42007-04-06 17:31:18 +0000696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
698 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_constructor(self):
701 rawio = self.MockRawIO([b"abc"])
702 bufio = self.tp(rawio)
703 bufio.__init__(rawio)
704 bufio.__init__(rawio, buffer_size=1024)
705 bufio.__init__(rawio, buffer_size=16)
706 self.assertEquals(b"abc", bufio.read())
707 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
708 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
709 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
710 rawio = self.MockRawIO([b"abc"])
711 bufio.__init__(rawio)
712 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000715 for arg in (None, 7):
716 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
717 bufio = self.tp(rawio)
718 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 # Invalid args
720 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722 def test_read1(self):
723 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
724 bufio = self.tp(rawio)
725 self.assertEquals(b"a", bufio.read(1))
726 self.assertEquals(b"b", bufio.read1(1))
727 self.assertEquals(rawio._reads, 1)
728 self.assertEquals(b"c", bufio.read1(100))
729 self.assertEquals(rawio._reads, 1)
730 self.assertEquals(b"d", bufio.read1(100))
731 self.assertEquals(rawio._reads, 2)
732 self.assertEquals(b"efg", bufio.read1(100))
733 self.assertEquals(rawio._reads, 3)
734 self.assertEquals(b"", bufio.read1(100))
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000735 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 # Invalid args
737 self.assertRaises(ValueError, bufio.read1, -1)
738
739 def test_readinto(self):
740 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
741 bufio = self.tp(rawio)
742 b = bytearray(2)
743 self.assertEquals(bufio.readinto(b), 2)
744 self.assertEquals(b, b"ab")
745 self.assertEquals(bufio.readinto(b), 2)
746 self.assertEquals(b, b"cd")
747 self.assertEquals(bufio.readinto(b), 2)
748 self.assertEquals(b, b"ef")
749 self.assertEquals(bufio.readinto(b), 1)
750 self.assertEquals(b, b"gf")
751 self.assertEquals(bufio.readinto(b), 0)
752 self.assertEquals(b, b"gf")
753
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000754 def test_readlines(self):
755 def bufio():
756 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
757 return self.tp(rawio)
758 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
759 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
760 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000763 data = b"abcdefghi"
764 dlen = len(data)
765
766 tests = [
767 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
768 [ 100, [ 3, 3, 3], [ dlen ] ],
769 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
770 ]
771
772 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000773 rawio = self.MockFileIO(data)
774 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000775 pos = 0
776 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000777 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000778 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000780 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000783 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000784 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
785 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000786
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000787 self.assertEquals(b"abcd", bufio.read(6))
788 self.assertEquals(b"e", bufio.read(1))
789 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000790 self.assertEquals(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000791 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000792 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read_past_eof(self):
795 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000797
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000798 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000800 def test_read_all(self):
801 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
802 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000803
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000804 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000805
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000806 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000807 try:
808 # Write out many bytes with exactly the same number of 0's,
809 # 1's... 255's. This will help us check that concurrent reading
810 # doesn't duplicate or forget contents.
811 N = 1000
812 l = list(range(256)) * N
813 random.shuffle(l)
814 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000815 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000816 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000817 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000818 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000819 errors = []
820 results = []
821 def f():
822 try:
823 # Intra-buffer read then buffer-flushing read
824 for n in cycle([1, 19]):
825 s = bufio.read(n)
826 if not s:
827 break
828 # list.append() is atomic
829 results.append(s)
830 except Exception as e:
831 errors.append(e)
832 raise
833 threads = [threading.Thread(target=f) for x in range(20)]
834 for t in threads:
835 t.start()
836 time.sleep(0.02) # yield
837 for t in threads:
838 t.join()
839 self.assertFalse(errors,
840 "the following exceptions were caught: %r" % errors)
841 s = b''.join(results)
842 for i in range(256):
843 c = bytes(bytearray([i]))
844 self.assertEqual(s.count(c), N)
845 finally:
846 support.unlink(support.TESTFN)
847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 def test_misbehaved_io(self):
849 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
850 bufio = self.tp(rawio)
851 self.assertRaises(IOError, bufio.seek, 0)
852 self.assertRaises(IOError, bufio.tell)
853
Antoine Pitrou00091ca2010-08-11 13:38:10 +0000854 def test_no_extraneous_read(self):
855 # Issue #9550; when the raw IO object has satisfied the read request,
856 # we should not issue any additional reads, otherwise it may block
857 # (e.g. socket).
858 bufsize = 16
859 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
860 rawio = self.MockRawIO([b"x" * n])
861 bufio = self.tp(rawio, bufsize)
862 self.assertEqual(bufio.read(n), b"x" * n)
863 # Simple case: one raw read is enough to satisfy the request.
864 self.assertEqual(rawio._extraneous_reads, 0,
865 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
866 # A more complex case where two raw reads are needed to satisfy
867 # the request.
868 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
869 bufio = self.tp(rawio, bufsize)
870 self.assertEqual(bufio.read(n), b"x" * n)
871 self.assertEqual(rawio._extraneous_reads, 0,
872 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
873
874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875class CBufferedReaderTest(BufferedReaderTest):
876 tp = io.BufferedReader
877
878 def test_constructor(self):
879 BufferedReaderTest.test_constructor(self)
880 # The allocation can succeed on 32-bit builds, e.g. with more
881 # than 2GB RAM and a 64-bit kernel.
882 if sys.maxsize > 0x7FFFFFFF:
883 rawio = self.MockRawIO()
884 bufio = self.tp(rawio)
885 self.assertRaises((OverflowError, MemoryError, ValueError),
886 bufio.__init__, rawio, sys.maxsize)
887
888 def test_initialization(self):
889 rawio = self.MockRawIO([b"abc"])
890 bufio = self.tp(rawio)
891 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
892 self.assertRaises(ValueError, bufio.read)
893 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
894 self.assertRaises(ValueError, bufio.read)
895 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
896 self.assertRaises(ValueError, bufio.read)
897
898 def test_misbehaved_io_read(self):
899 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
900 bufio = self.tp(rawio)
901 # _pyio.BufferedReader seems to implement reading different, so that
902 # checking this is not so easy.
903 self.assertRaises(IOError, bufio.read, 10)
904
905 def test_garbage_collection(self):
906 # C BufferedReader objects are collected.
907 # The Python version has __del__, so it ends into gc.garbage instead
908 rawio = self.FileIO(support.TESTFN, "w+b")
909 f = self.tp(rawio)
910 f.f = f
911 wr = weakref.ref(f)
912 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000913 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000914 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915
916class PyBufferedReaderTest(BufferedReaderTest):
917 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000918
Guido van Rossuma9e20242007-03-08 00:43:48 +0000919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000920class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
921 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 def test_constructor(self):
924 rawio = self.MockRawIO()
925 bufio = self.tp(rawio)
926 bufio.__init__(rawio)
927 bufio.__init__(rawio, buffer_size=1024)
928 bufio.__init__(rawio, buffer_size=16)
929 self.assertEquals(3, bufio.write(b"abc"))
930 bufio.flush()
931 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
933 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
934 bufio.__init__(rawio)
935 self.assertEquals(3, bufio.write(b"ghi"))
936 bufio.flush()
937 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
938
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000939 def test_detach_flush(self):
940 raw = self.MockRawIO()
941 buf = self.tp(raw)
942 buf.write(b"howdy!")
943 self.assertFalse(raw._write_stack)
944 buf.detach()
945 self.assertEqual(raw._write_stack, [b"howdy!"])
946
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000948 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 writer = self.MockRawIO()
950 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000951 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000952 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_write_overflow(self):
955 writer = self.MockRawIO()
956 bufio = self.tp(writer, 8)
957 contents = b"abcdefghijklmnop"
958 for n in range(0, len(contents), 3):
959 bufio.write(contents[n:n+3])
960 flushed = b"".join(writer._write_stack)
961 # At least (total - 8) bytes were implicitly flushed, perhaps more
962 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000963 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 def check_writes(self, intermediate_func):
966 # Lots of writes, test the flushed output is as expected.
967 contents = bytes(range(256)) * 1000
968 n = 0
969 writer = self.MockRawIO()
970 bufio = self.tp(writer, 13)
971 # Generator of write sizes: repeat each N 15 times then proceed to N+1
972 def gen_sizes():
973 for size in count(1):
974 for i in range(15):
975 yield size
976 sizes = gen_sizes()
977 while n < len(contents):
978 size = min(next(sizes), len(contents) - n)
979 self.assertEquals(bufio.write(contents[n:n+size]), size)
980 intermediate_func(bufio)
981 n += size
982 bufio.flush()
983 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 def test_writes(self):
986 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 def test_writes_and_flushes(self):
989 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000990
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 def test_writes_and_seeks(self):
992 def _seekabs(bufio):
993 pos = bufio.tell()
994 bufio.seek(pos + 1, 0)
995 bufio.seek(pos - 1, 0)
996 bufio.seek(pos, 0)
997 self.check_writes(_seekabs)
998 def _seekrel(bufio):
999 pos = bufio.seek(0, 1)
1000 bufio.seek(+1, 1)
1001 bufio.seek(-1, 1)
1002 bufio.seek(pos, 0)
1003 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001004
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001005 def test_writes_and_truncates(self):
1006 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008 def test_write_non_blocking(self):
1009 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001010 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001011
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001012 self.assertEquals(bufio.write(b"abcd"), 4)
1013 self.assertEquals(bufio.write(b"efghi"), 5)
1014 # 1 byte will be written, the rest will be buffered
1015 raw.block_on(b"k")
1016 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001017
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1019 raw.block_on(b"0")
1020 try:
1021 bufio.write(b"opqrwxyz0123456789")
1022 except self.BlockingIOError as e:
1023 written = e.characters_written
1024 else:
1025 self.fail("BlockingIOError should have been raised")
1026 self.assertEquals(written, 16)
1027 self.assertEquals(raw.pop_written(),
1028 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1031 s = raw.pop_written()
1032 # Previously buffered bytes were flushed
1033 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 def test_write_and_rewind(self):
1036 raw = io.BytesIO()
1037 bufio = self.tp(raw, 4)
1038 self.assertEqual(bufio.write(b"abcdef"), 6)
1039 self.assertEqual(bufio.tell(), 6)
1040 bufio.seek(0, 0)
1041 self.assertEqual(bufio.write(b"XY"), 2)
1042 bufio.seek(6, 0)
1043 self.assertEqual(raw.getvalue(), b"XYcdef")
1044 self.assertEqual(bufio.write(b"123456"), 6)
1045 bufio.flush()
1046 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048 def test_flush(self):
1049 writer = self.MockRawIO()
1050 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001051 bufio.write(b"abc")
1052 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001053 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 def test_destructor(self):
1056 writer = self.MockRawIO()
1057 bufio = self.tp(writer, 8)
1058 bufio.write(b"abc")
1059 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001060 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001061 self.assertEquals(b"abc", writer._write_stack[0])
1062
1063 def test_truncate(self):
1064 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001065 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 bufio = self.tp(raw, 8)
1067 bufio.write(b"abcdef")
1068 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001069 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001070 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 self.assertEqual(f.read(), b"abc")
1072
1073 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001074 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075 # Write out many bytes from many threads and test they were
1076 # all flushed.
1077 N = 1000
1078 contents = bytes(range(256)) * N
1079 sizes = cycle([1, 19])
1080 n = 0
1081 queue = deque()
1082 while n < len(contents):
1083 size = next(sizes)
1084 queue.append(contents[n:n+size])
1085 n += size
1086 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001087 # We use a real file object because it allows us to
1088 # exercise situations where the GIL is released before
1089 # writing the buffer to the raw streams. This is in addition
1090 # to concurrency issues due to switching threads in the middle
1091 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001092 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001094 errors = []
1095 def f():
1096 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 while True:
1098 try:
1099 s = queue.popleft()
1100 except IndexError:
1101 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001102 bufio.write(s)
1103 except Exception as e:
1104 errors.append(e)
1105 raise
1106 threads = [threading.Thread(target=f) for x in range(20)]
1107 for t in threads:
1108 t.start()
1109 time.sleep(0.02) # yield
1110 for t in threads:
1111 t.join()
1112 self.assertFalse(errors,
1113 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001115 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 s = f.read()
1117 for i in range(256):
1118 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001119 finally:
1120 support.unlink(support.TESTFN)
1121
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001122 def test_misbehaved_io(self):
1123 rawio = self.MisbehavedRawIO()
1124 bufio = self.tp(rawio, 5)
1125 self.assertRaises(IOError, bufio.seek, 0)
1126 self.assertRaises(IOError, bufio.tell)
1127 self.assertRaises(IOError, bufio.write, b"abcdef")
1128
Benjamin Peterson59406a92009-03-26 17:10:29 +00001129 def test_max_buffer_size_deprecation(self):
1130 with support.check_warnings() as w:
1131 warnings.simplefilter("always", DeprecationWarning)
1132 self.tp(self.MockRawIO(), 8, 12)
1133 self.assertEqual(len(w.warnings), 1)
1134 warning = w.warnings[0]
1135 self.assertTrue(warning.category is DeprecationWarning)
1136 self.assertEqual(str(warning.message),
1137 "max_buffer_size is deprecated")
1138
1139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140class CBufferedWriterTest(BufferedWriterTest):
1141 tp = io.BufferedWriter
1142
1143 def test_constructor(self):
1144 BufferedWriterTest.test_constructor(self)
1145 # The allocation can succeed on 32-bit builds, e.g. with more
1146 # than 2GB RAM and a 64-bit kernel.
1147 if sys.maxsize > 0x7FFFFFFF:
1148 rawio = self.MockRawIO()
1149 bufio = self.tp(rawio)
1150 self.assertRaises((OverflowError, MemoryError, ValueError),
1151 bufio.__init__, rawio, sys.maxsize)
1152
1153 def test_initialization(self):
1154 rawio = self.MockRawIO()
1155 bufio = self.tp(rawio)
1156 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1157 self.assertRaises(ValueError, bufio.write, b"def")
1158 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1159 self.assertRaises(ValueError, bufio.write, b"def")
1160 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1161 self.assertRaises(ValueError, bufio.write, b"def")
1162
1163 def test_garbage_collection(self):
1164 # C BufferedWriter objects are collected, and collecting them flushes
1165 # all data to disk.
1166 # The Python version has __del__, so it ends into gc.garbage instead
1167 rawio = self.FileIO(support.TESTFN, "w+b")
1168 f = self.tp(rawio)
1169 f.write(b"123xxx")
1170 f.x = f
1171 wr = weakref.ref(f)
1172 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001173 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001174 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001175 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 self.assertEqual(f.read(), b"123xxx")
1177
1178
1179class PyBufferedWriterTest(BufferedWriterTest):
1180 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001181
Guido van Rossum01a27522007-03-07 01:00:12 +00001182class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001183
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001184 def test_constructor(self):
1185 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001186 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001187
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001188 def test_detach(self):
1189 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1190 self.assertRaises(self.UnsupportedOperation, pair.detach)
1191
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001192 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001193 with support.check_warnings() as w:
1194 warnings.simplefilter("always", DeprecationWarning)
1195 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1196 self.assertEqual(len(w.warnings), 1)
1197 warning = w.warnings[0]
1198 self.assertTrue(warning.category is DeprecationWarning)
1199 self.assertEqual(str(warning.message),
1200 "max_buffer_size is deprecated")
1201
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001202 def test_constructor_with_not_readable(self):
1203 class NotReadable(MockRawIO):
1204 def readable(self):
1205 return False
1206
1207 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1208
1209 def test_constructor_with_not_writeable(self):
1210 class NotWriteable(MockRawIO):
1211 def writable(self):
1212 return False
1213
1214 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1215
1216 def test_read(self):
1217 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1218
1219 self.assertEqual(pair.read(3), b"abc")
1220 self.assertEqual(pair.read(1), b"d")
1221 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001222 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1223 self.assertEqual(pair.read(None), b"abc")
1224
1225 def test_readlines(self):
1226 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1227 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1228 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1229 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001230
1231 def test_read1(self):
1232 # .read1() is delegated to the underlying reader object, so this test
1233 # can be shallow.
1234 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1235
1236 self.assertEqual(pair.read1(3), b"abc")
1237
1238 def test_readinto(self):
1239 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1240
1241 data = bytearray(5)
1242 self.assertEqual(pair.readinto(data), 5)
1243 self.assertEqual(data, b"abcde")
1244
1245 def test_write(self):
1246 w = self.MockRawIO()
1247 pair = self.tp(self.MockRawIO(), w)
1248
1249 pair.write(b"abc")
1250 pair.flush()
1251 pair.write(b"def")
1252 pair.flush()
1253 self.assertEqual(w._write_stack, [b"abc", b"def"])
1254
1255 def test_peek(self):
1256 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1257
1258 self.assertTrue(pair.peek(3).startswith(b"abc"))
1259 self.assertEqual(pair.read(3), b"abc")
1260
1261 def test_readable(self):
1262 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1263 self.assertTrue(pair.readable())
1264
1265 def test_writeable(self):
1266 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1267 self.assertTrue(pair.writable())
1268
1269 def test_seekable(self):
1270 # BufferedRWPairs are never seekable, even if their readers and writers
1271 # are.
1272 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1273 self.assertFalse(pair.seekable())
1274
1275 # .flush() is delegated to the underlying writer object and has been
1276 # tested in the test_write method.
1277
1278 def test_close_and_closed(self):
1279 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1280 self.assertFalse(pair.closed)
1281 pair.close()
1282 self.assertTrue(pair.closed)
1283
1284 def test_isatty(self):
1285 class SelectableIsAtty(MockRawIO):
1286 def __init__(self, isatty):
1287 MockRawIO.__init__(self)
1288 self._isatty = isatty
1289
1290 def isatty(self):
1291 return self._isatty
1292
1293 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1294 self.assertFalse(pair.isatty())
1295
1296 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1297 self.assertTrue(pair.isatty())
1298
1299 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1300 self.assertTrue(pair.isatty())
1301
1302 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1303 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305class CBufferedRWPairTest(BufferedRWPairTest):
1306 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001307
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308class PyBufferedRWPairTest(BufferedRWPairTest):
1309 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001310
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311
1312class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1313 read_mode = "rb+"
1314 write_mode = "wb+"
1315
1316 def test_constructor(self):
1317 BufferedReaderTest.test_constructor(self)
1318 BufferedWriterTest.test_constructor(self)
1319
1320 def test_read_and_write(self):
1321 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001322 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001323
1324 self.assertEqual(b"as", rw.read(2))
1325 rw.write(b"ddd")
1326 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001327 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001329 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331 def test_seek_and_tell(self):
1332 raw = self.BytesIO(b"asdfghjkl")
1333 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001334
1335 self.assertEquals(b"as", rw.read(2))
1336 self.assertEquals(2, rw.tell())
1337 rw.seek(0, 0)
1338 self.assertEquals(b"asdf", rw.read(4))
1339
1340 rw.write(b"asdf")
1341 rw.seek(0, 0)
1342 self.assertEquals(b"asdfasdfl", rw.read())
1343 self.assertEquals(9, rw.tell())
1344 rw.seek(-4, 2)
1345 self.assertEquals(5, rw.tell())
1346 rw.seek(2, 1)
1347 self.assertEquals(7, rw.tell())
1348 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001349 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001350
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001351 def check_flush_and_read(self, read_func):
1352 raw = self.BytesIO(b"abcdefghi")
1353 bufio = self.tp(raw)
1354
1355 self.assertEquals(b"ab", read_func(bufio, 2))
1356 bufio.write(b"12")
1357 self.assertEquals(b"ef", read_func(bufio, 2))
1358 self.assertEquals(6, bufio.tell())
1359 bufio.flush()
1360 self.assertEquals(6, bufio.tell())
1361 self.assertEquals(b"ghi", read_func(bufio))
1362 raw.seek(0, 0)
1363 raw.write(b"XYZ")
1364 # flush() resets the read buffer
1365 bufio.flush()
1366 bufio.seek(0, 0)
1367 self.assertEquals(b"XYZ", read_func(bufio, 3))
1368
1369 def test_flush_and_read(self):
1370 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1371
1372 def test_flush_and_readinto(self):
1373 def _readinto(bufio, n=-1):
1374 b = bytearray(n if n >= 0 else 9999)
1375 n = bufio.readinto(b)
1376 return bytes(b[:n])
1377 self.check_flush_and_read(_readinto)
1378
1379 def test_flush_and_peek(self):
1380 def _peek(bufio, n=-1):
1381 # This relies on the fact that the buffer can contain the whole
1382 # raw stream, otherwise peek() can return less.
1383 b = bufio.peek(n)
1384 if n != -1:
1385 b = b[:n]
1386 bufio.seek(len(b), 1)
1387 return b
1388 self.check_flush_and_read(_peek)
1389
1390 def test_flush_and_write(self):
1391 raw = self.BytesIO(b"abcdefghi")
1392 bufio = self.tp(raw)
1393
1394 bufio.write(b"123")
1395 bufio.flush()
1396 bufio.write(b"45")
1397 bufio.flush()
1398 bufio.seek(0, 0)
1399 self.assertEquals(b"12345fghi", raw.getvalue())
1400 self.assertEquals(b"12345fghi", bufio.read())
1401
1402 def test_threads(self):
1403 BufferedReaderTest.test_threads(self)
1404 BufferedWriterTest.test_threads(self)
1405
1406 def test_writes_and_peek(self):
1407 def _peek(bufio):
1408 bufio.peek(1)
1409 self.check_writes(_peek)
1410 def _peek(bufio):
1411 pos = bufio.tell()
1412 bufio.seek(-1, 1)
1413 bufio.peek(1)
1414 bufio.seek(pos, 0)
1415 self.check_writes(_peek)
1416
1417 def test_writes_and_reads(self):
1418 def _read(bufio):
1419 bufio.seek(-1, 1)
1420 bufio.read(1)
1421 self.check_writes(_read)
1422
1423 def test_writes_and_read1s(self):
1424 def _read1(bufio):
1425 bufio.seek(-1, 1)
1426 bufio.read1(1)
1427 self.check_writes(_read1)
1428
1429 def test_writes_and_readintos(self):
1430 def _read(bufio):
1431 bufio.seek(-1, 1)
1432 bufio.readinto(bytearray(1))
1433 self.check_writes(_read)
1434
Antoine Pitrou0473e562009-08-06 20:52:43 +00001435 def test_write_after_readahead(self):
1436 # Issue #6629: writing after the buffer was filled by readahead should
1437 # first rewind the raw stream.
1438 for overwrite_size in [1, 5]:
1439 raw = self.BytesIO(b"A" * 10)
1440 bufio = self.tp(raw, 4)
1441 # Trigger readahead
1442 self.assertEqual(bufio.read(1), b"A")
1443 self.assertEqual(bufio.tell(), 1)
1444 # Overwriting should rewind the raw stream if it needs so
1445 bufio.write(b"B" * overwrite_size)
1446 self.assertEqual(bufio.tell(), overwrite_size + 1)
1447 # If the write size was smaller than the buffer size, flush() and
1448 # check that rewind happens.
1449 bufio.flush()
1450 self.assertEqual(bufio.tell(), overwrite_size + 1)
1451 s = raw.getvalue()
1452 self.assertEqual(s,
1453 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1454
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001455 def test_truncate_after_read_or_write(self):
1456 raw = self.BytesIO(b"A" * 10)
1457 bufio = self.tp(raw, 100)
1458 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1459 self.assertEqual(bufio.truncate(), 2)
1460 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1461 self.assertEqual(bufio.truncate(), 4)
1462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 def test_misbehaved_io(self):
1464 BufferedReaderTest.test_misbehaved_io(self)
1465 BufferedWriterTest.test_misbehaved_io(self)
1466
1467class CBufferedRandomTest(BufferedRandomTest):
1468 tp = io.BufferedRandom
1469
1470 def test_constructor(self):
1471 BufferedRandomTest.test_constructor(self)
1472 # The allocation can succeed on 32-bit builds, e.g. with more
1473 # than 2GB RAM and a 64-bit kernel.
1474 if sys.maxsize > 0x7FFFFFFF:
1475 rawio = self.MockRawIO()
1476 bufio = self.tp(rawio)
1477 self.assertRaises((OverflowError, MemoryError, ValueError),
1478 bufio.__init__, rawio, sys.maxsize)
1479
1480 def test_garbage_collection(self):
1481 CBufferedReaderTest.test_garbage_collection(self)
1482 CBufferedWriterTest.test_garbage_collection(self)
1483
1484class PyBufferedRandomTest(BufferedRandomTest):
1485 tp = pyio.BufferedRandom
1486
1487
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001488# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1489# properties:
1490# - A single output character can correspond to many bytes of input.
1491# - The number of input bytes to complete the character can be
1492# undetermined until the last input byte is received.
1493# - The number of input bytes can vary depending on previous input.
1494# - A single input byte can correspond to many characters of output.
1495# - The number of output characters can be undetermined until the
1496# last input byte is received.
1497# - The number of output characters can vary depending on previous input.
1498
1499class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1500 """
1501 For testing seek/tell behavior with a stateful, buffering decoder.
1502
1503 Input is a sequence of words. Words may be fixed-length (length set
1504 by input) or variable-length (period-terminated). In variable-length
1505 mode, extra periods are ignored. Possible words are:
1506 - 'i' followed by a number sets the input length, I (maximum 99).
1507 When I is set to 0, words are space-terminated.
1508 - 'o' followed by a number sets the output length, O (maximum 99).
1509 - Any other word is converted into a word followed by a period on
1510 the output. The output word consists of the input word truncated
1511 or padded out with hyphens to make its length equal to O. If O
1512 is 0, the word is output verbatim without truncating or padding.
1513 I and O are initially set to 1. When I changes, any buffered input is
1514 re-scanned according to the new I. EOF also terminates the last word.
1515 """
1516
1517 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001518 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001519 self.reset()
1520
1521 def __repr__(self):
1522 return '<SID %x>' % id(self)
1523
1524 def reset(self):
1525 self.i = 1
1526 self.o = 1
1527 self.buffer = bytearray()
1528
1529 def getstate(self):
1530 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1531 return bytes(self.buffer), i*100 + o
1532
1533 def setstate(self, state):
1534 buffer, io = state
1535 self.buffer = bytearray(buffer)
1536 i, o = divmod(io, 100)
1537 self.i, self.o = i ^ 1, o ^ 1
1538
1539 def decode(self, input, final=False):
1540 output = ''
1541 for b in input:
1542 if self.i == 0: # variable-length, terminated with period
1543 if b == ord('.'):
1544 if self.buffer:
1545 output += self.process_word()
1546 else:
1547 self.buffer.append(b)
1548 else: # fixed-length, terminate after self.i bytes
1549 self.buffer.append(b)
1550 if len(self.buffer) == self.i:
1551 output += self.process_word()
1552 if final and self.buffer: # EOF terminates the last word
1553 output += self.process_word()
1554 return output
1555
1556 def process_word(self):
1557 output = ''
1558 if self.buffer[0] == ord('i'):
1559 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1560 elif self.buffer[0] == ord('o'):
1561 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1562 else:
1563 output = self.buffer.decode('ascii')
1564 if len(output) < self.o:
1565 output += '-'*self.o # pad out with hyphens
1566 if self.o:
1567 output = output[:self.o] # truncate to output length
1568 output += '.'
1569 self.buffer = bytearray()
1570 return output
1571
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001572 codecEnabled = False
1573
1574 @classmethod
1575 def lookupTestDecoder(cls, name):
1576 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001577 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001578 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001579 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001580 incrementalencoder=None,
1581 streamreader=None, streamwriter=None,
1582 incrementaldecoder=cls)
1583
1584# Register the previous decoder for testing.
1585# Disabled by default, tests will enable it.
1586codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1587
1588
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001589class StatefulIncrementalDecoderTest(unittest.TestCase):
1590 """
1591 Make sure the StatefulIncrementalDecoder actually works.
1592 """
1593
1594 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001595 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001596 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001597 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001598 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001599 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001600 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001601 # I=0, O=6 (variable-length input, fixed-length output)
1602 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1603 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001604 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001605 # I=6, O=3 (fixed-length input > fixed-length output)
1606 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1607 # I=0, then 3; O=29, then 15 (with longer output)
1608 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1609 'a----------------------------.' +
1610 'b----------------------------.' +
1611 'cde--------------------------.' +
1612 'abcdefghijabcde.' +
1613 'a.b------------.' +
1614 '.c.------------.' +
1615 'd.e------------.' +
1616 'k--------------.' +
1617 'l--------------.' +
1618 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001619 ]
1620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001622 # Try a few one-shot test cases.
1623 for input, eof, output in self.test_cases:
1624 d = StatefulIncrementalDecoder()
1625 self.assertEquals(d.decode(input, eof), output)
1626
1627 # Also test an unfinished decode, followed by forcing EOF.
1628 d = StatefulIncrementalDecoder()
1629 self.assertEquals(d.decode(b'oiabcd'), '')
1630 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001631
1632class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001633
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001634 def setUp(self):
1635 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1636 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001637 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001638
Guido van Rossumd0712812007-04-11 16:32:43 +00001639 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001640 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 def test_constructor(self):
1643 r = self.BytesIO(b"\xc3\xa9\n\n")
1644 b = self.BufferedReader(r, 1000)
1645 t = self.TextIOWrapper(b)
1646 t.__init__(b, encoding="latin1", newline="\r\n")
1647 self.assertEquals(t.encoding, "latin1")
1648 self.assertEquals(t.line_buffering, False)
1649 t.__init__(b, encoding="utf8", line_buffering=True)
1650 self.assertEquals(t.encoding, "utf8")
1651 self.assertEquals(t.line_buffering, True)
1652 self.assertEquals("\xe9\n", t.readline())
1653 self.assertRaises(TypeError, t.__init__, b, newline=42)
1654 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1655
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001656 def test_detach(self):
1657 r = self.BytesIO()
1658 b = self.BufferedWriter(r)
1659 t = self.TextIOWrapper(b)
1660 self.assertIs(t.detach(), b)
1661
1662 t = self.TextIOWrapper(b, encoding="ascii")
1663 t.write("howdy")
1664 self.assertFalse(r.getvalue())
1665 t.detach()
1666 self.assertEqual(r.getvalue(), b"howdy")
1667 self.assertRaises(ValueError, t.detach)
1668
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001669 def test_repr(self):
1670 raw = self.BytesIO("hello".encode("utf-8"))
1671 b = self.BufferedReader(raw)
1672 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001673 modname = self.TextIOWrapper.__module__
1674 self.assertEqual(repr(t),
1675 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1676 raw.name = "dummy"
1677 self.assertEqual(repr(t),
1678 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1679 raw.name = b"dummy"
1680 self.assertEqual(repr(t),
1681 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 def test_line_buffering(self):
1684 r = self.BytesIO()
1685 b = self.BufferedWriter(r, 1000)
1686 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001687 t.write("X")
1688 self.assertEquals(r.getvalue(), b"") # No flush happened
1689 t.write("Y\nZ")
1690 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1691 t.write("A\rB")
1692 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_encoding(self):
1695 # Check the encoding attribute is always set, and valid
1696 b = self.BytesIO()
1697 t = self.TextIOWrapper(b, encoding="utf8")
1698 self.assertEqual(t.encoding, "utf8")
1699 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001700 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 codecs.lookup(t.encoding)
1702
1703 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001704 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 b = self.BytesIO(b"abc\n\xff\n")
1706 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001707 self.assertRaises(UnicodeError, t.read)
1708 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 b = self.BytesIO(b"abc\n\xff\n")
1710 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001711 self.assertRaises(UnicodeError, t.read)
1712 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001713 b = self.BytesIO(b"abc\n\xff\n")
1714 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001715 self.assertEquals(t.read(), "abc\n\n")
1716 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 b = self.BytesIO(b"abc\n\xff\n")
1718 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001719 self.assertEquals(t.read(), "abc\n\ufffd\n")
1720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001722 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 b = self.BytesIO()
1724 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001725 self.assertRaises(UnicodeError, t.write, "\xff")
1726 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 b = self.BytesIO()
1728 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001729 self.assertRaises(UnicodeError, t.write, "\xff")
1730 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 b = self.BytesIO()
1732 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001733 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001734 t.write("abc\xffdef\n")
1735 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001736 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001737 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 b = self.BytesIO()
1739 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001740 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001741 t.write("abc\xffdef\n")
1742 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001743 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001746 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1747
1748 tests = [
1749 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001750 [ '', input_lines ],
1751 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1752 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1753 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001754 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001755 encodings = (
1756 'utf-8', 'latin-1',
1757 'utf-16', 'utf-16-le', 'utf-16-be',
1758 'utf-32', 'utf-32-le', 'utf-32-be',
1759 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001760
Guido van Rossum8358db22007-08-18 21:39:55 +00001761 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001762 # character in TextIOWrapper._pending_line.
1763 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001764 # XXX: str.encode() should return bytes
1765 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001766 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001767 for bufsize in range(1, 10):
1768 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1770 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001771 encoding=encoding)
1772 if do_reads:
1773 got_lines = []
1774 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001775 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001776 if c2 == '':
1777 break
1778 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001779 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001780 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001781 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001782
1783 for got_line, exp_line in zip(got_lines, exp_lines):
1784 self.assertEquals(got_line, exp_line)
1785 self.assertEquals(len(got_lines), len(exp_lines))
1786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 def test_newlines_input(self):
1788 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001789 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1790 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001791 (None, normalized.decode("ascii").splitlines(True)),
1792 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1794 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1795 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001796 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 buf = self.BytesIO(testdata)
1798 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001799 self.assertEquals(txt.readlines(), expected)
1800 txt.seek(0)
1801 self.assertEquals(txt.read(), "".join(expected))
1802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 def test_newlines_output(self):
1804 testdict = {
1805 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1806 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1807 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1808 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1809 }
1810 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1811 for newline, expected in tests:
1812 buf = self.BytesIO()
1813 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1814 txt.write("AAA\nB")
1815 txt.write("BB\nCCC\n")
1816 txt.write("X\rY\r\nZ")
1817 txt.flush()
1818 self.assertEquals(buf.closed, False)
1819 self.assertEquals(buf.getvalue(), expected)
1820
1821 def test_destructor(self):
1822 l = []
1823 base = self.BytesIO
1824 class MyBytesIO(base):
1825 def close(self):
1826 l.append(self.getvalue())
1827 base.close(self)
1828 b = MyBytesIO()
1829 t = self.TextIOWrapper(b, encoding="ascii")
1830 t.write("abc")
1831 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001832 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001833 self.assertEquals([b"abc"], l)
1834
1835 def test_override_destructor(self):
1836 record = []
1837 class MyTextIO(self.TextIOWrapper):
1838 def __del__(self):
1839 record.append(1)
1840 try:
1841 f = super().__del__
1842 except AttributeError:
1843 pass
1844 else:
1845 f()
1846 def close(self):
1847 record.append(2)
1848 super().close()
1849 def flush(self):
1850 record.append(3)
1851 super().flush()
1852 b = self.BytesIO()
1853 t = MyTextIO(b, encoding="ascii")
1854 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001855 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 self.assertEqual(record, [1, 2, 3])
1857
1858 def test_error_through_destructor(self):
1859 # Test that the exception state is not modified by a destructor,
1860 # even if close() fails.
1861 rawio = self.CloseFailureIO()
1862 def f():
1863 self.TextIOWrapper(rawio).xyzzy
1864 with support.captured_output("stderr") as s:
1865 self.assertRaises(AttributeError, f)
1866 s = s.getvalue().strip()
1867 if s:
1868 # The destructor *may* have printed an unraisable error, check it
1869 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001870 self.assertTrue(s.startswith("Exception IOError: "), s)
1871 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001872
Guido van Rossum9b76da62007-04-11 01:09:03 +00001873 # Systematic tests of the text I/O API
1874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001876 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1877 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001879 f._CHUNK_SIZE = chunksize
1880 self.assertEquals(f.write("abc"), 3)
1881 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001883 f._CHUNK_SIZE = chunksize
1884 self.assertEquals(f.tell(), 0)
1885 self.assertEquals(f.read(), "abc")
1886 cookie = f.tell()
1887 self.assertEquals(f.seek(0), 0)
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001888 self.assertEquals(f.read(None), "abc")
1889 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001890 self.assertEquals(f.read(2), "ab")
1891 self.assertEquals(f.read(1), "c")
1892 self.assertEquals(f.read(1), "")
1893 self.assertEquals(f.read(), "")
1894 self.assertEquals(f.tell(), cookie)
1895 self.assertEquals(f.seek(0), 0)
1896 self.assertEquals(f.seek(0, 2), cookie)
1897 self.assertEquals(f.write("def"), 3)
1898 self.assertEquals(f.seek(cookie), cookie)
1899 self.assertEquals(f.read(), "def")
1900 if enc.startswith("utf"):
1901 self.multi_line_test(f, enc)
1902 f.close()
1903
1904 def multi_line_test(self, f, enc):
1905 f.seek(0)
1906 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001907 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001908 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001909 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001910 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001911 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001912 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001913 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001914 wlines.append((f.tell(), line))
1915 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001916 f.seek(0)
1917 rlines = []
1918 while True:
1919 pos = f.tell()
1920 line = f.readline()
1921 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001922 break
1923 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001924 self.assertEquals(rlines, wlines)
1925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926 def test_telling(self):
1927 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001928 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001929 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001930 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001931 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001932 p2 = f.tell()
1933 f.seek(0)
1934 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001935 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001936 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001937 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001938 self.assertEquals(f.tell(), p2)
1939 f.seek(0)
1940 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001941 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001942 self.assertRaises(IOError, f.tell)
1943 self.assertEquals(f.tell(), p2)
1944 f.close()
1945
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 def test_seeking(self):
1947 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001948 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001949 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001950 prefix = bytes(u_prefix.encode("utf-8"))
1951 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001952 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001953 suffix = bytes(u_suffix.encode("utf-8"))
1954 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001956 f.write(line*2)
1957 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001959 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001960 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001961 self.assertEquals(f.tell(), prefix_size)
1962 self.assertEquals(f.readline(), u_suffix)
1963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001965 # Regression test for a specific bug
1966 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001968 f.write(data)
1969 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001971 f._CHUNK_SIZE # Just test that it exists
1972 f._CHUNK_SIZE = 2
1973 f.readline()
1974 f.tell()
1975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 def test_seek_and_tell(self):
1977 #Test seek/tell using the StatefulIncrementalDecoder.
1978 # Make test faster by doing smaller seeks
1979 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001980
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001981 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001982 """Tell/seek to various points within a data stream and ensure
1983 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001985 f.write(data)
1986 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 f = self.open(support.TESTFN, encoding='test_decoder')
1988 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001989 decoded = f.read()
1990 f.close()
1991
Neal Norwitze2b07052008-03-18 19:52:05 +00001992 for i in range(min_pos, len(decoded) + 1): # seek positions
1993 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001995 self.assertEquals(f.read(i), decoded[:i])
1996 cookie = f.tell()
1997 self.assertEquals(f.read(j), decoded[i:i + j])
1998 f.seek(cookie)
1999 self.assertEquals(f.read(), decoded[i:])
2000 f.close()
2001
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002002 # Enable the test decoder.
2003 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002004
2005 # Run the tests.
2006 try:
2007 # Try each test case.
2008 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002009 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002010
2011 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002012 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2013 offset = CHUNK_SIZE - len(input)//2
2014 prefix = b'.'*offset
2015 # Don't bother seeking into the prefix (takes too long).
2016 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002017 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002018
2019 # Ensure our test decoder won't interfere with subsequent tests.
2020 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002021 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002022
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002023 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002024 data = "1234567890"
2025 tests = ("utf-16",
2026 "utf-16-le",
2027 "utf-16-be",
2028 "utf-32",
2029 "utf-32-le",
2030 "utf-32-be")
2031 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 buf = self.BytesIO()
2033 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002034 # Check if the BOM is written only once (see issue1753).
2035 f.write(data)
2036 f.write(data)
2037 f.seek(0)
2038 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002039 f.seek(0)
2040 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002041 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2042
Benjamin Petersona1b49012009-03-31 23:11:32 +00002043 def test_unreadable(self):
2044 class UnReadable(self.BytesIO):
2045 def readable(self):
2046 return False
2047 txt = self.TextIOWrapper(UnReadable())
2048 self.assertRaises(IOError, txt.read)
2049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 def test_read_one_by_one(self):
2051 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002052 reads = ""
2053 while True:
2054 c = txt.read(1)
2055 if not c:
2056 break
2057 reads += c
2058 self.assertEquals(reads, "AA\nBB")
2059
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002060 def test_readlines(self):
2061 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2062 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2063 txt.seek(0)
2064 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2065 txt.seek(0)
2066 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2067
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002068 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002069 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002070 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002072 reads = ""
2073 while True:
2074 c = txt.read(128)
2075 if not c:
2076 break
2077 reads += c
2078 self.assertEquals(reads, "A"*127+"\nB")
2079
2080 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002082
2083 # read one char at a time
2084 reads = ""
2085 while True:
2086 c = txt.read(1)
2087 if not c:
2088 break
2089 reads += c
2090 self.assertEquals(reads, self.normalized)
2091
2092 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002093 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002094 txt._CHUNK_SIZE = 4
2095
2096 reads = ""
2097 while True:
2098 c = txt.read(4)
2099 if not c:
2100 break
2101 reads += c
2102 self.assertEquals(reads, self.normalized)
2103
2104 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002106 txt._CHUNK_SIZE = 4
2107
2108 reads = txt.read(4)
2109 reads += txt.read(4)
2110 reads += txt.readline()
2111 reads += txt.readline()
2112 reads += txt.readline()
2113 self.assertEquals(reads, self.normalized)
2114
2115 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002117 txt._CHUNK_SIZE = 4
2118
2119 reads = txt.read(4)
2120 reads += txt.read()
2121 self.assertEquals(reads, self.normalized)
2122
2123 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002124 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002125 txt._CHUNK_SIZE = 4
2126
2127 reads = txt.read(4)
2128 pos = txt.tell()
2129 txt.seek(0)
2130 txt.seek(pos)
2131 self.assertEquals(txt.read(4), "BBB\n")
2132
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002133 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002134 buffer = self.BytesIO(self.testdata)
2135 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002136
2137 self.assertEqual(buffer.seekable(), txt.seekable())
2138
Antoine Pitroue4501852009-05-14 18:55:55 +00002139 def test_append_bom(self):
2140 # The BOM is not written again when appending to a non-empty file
2141 filename = support.TESTFN
2142 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2143 with self.open(filename, 'w', encoding=charset) as f:
2144 f.write('aaa')
2145 pos = f.tell()
2146 with self.open(filename, 'rb') as f:
2147 self.assertEquals(f.read(), 'aaa'.encode(charset))
2148
2149 with self.open(filename, 'a', encoding=charset) as f:
2150 f.write('xxx')
2151 with self.open(filename, 'rb') as f:
2152 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2153
2154 def test_seek_bom(self):
2155 # Same test, but when seeking manually
2156 filename = support.TESTFN
2157 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2158 with self.open(filename, 'w', encoding=charset) as f:
2159 f.write('aaa')
2160 pos = f.tell()
2161 with self.open(filename, 'r+', encoding=charset) as f:
2162 f.seek(pos)
2163 f.write('zzz')
2164 f.seek(0)
2165 f.write('bbb')
2166 with self.open(filename, 'rb') as f:
2167 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2168
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002169 def test_errors_property(self):
2170 with self.open(support.TESTFN, "w") as f:
2171 self.assertEqual(f.errors, "strict")
2172 with self.open(support.TESTFN, "w", errors="replace") as f:
2173 self.assertEqual(f.errors, "replace")
2174
Antoine Pitroue4501852009-05-14 18:55:55 +00002175
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002176 def test_threads_write(self):
2177 # Issue6750: concurrent writes could duplicate data
2178 event = threading.Event()
2179 with self.open(support.TESTFN, "w", buffering=1) as f:
2180 def run(n):
2181 text = "Thread%03d\n" % n
2182 event.wait()
2183 f.write(text)
2184 threads = [threading.Thread(target=lambda n=x: run(n))
2185 for x in range(20)]
2186 for t in threads:
2187 t.start()
2188 time.sleep(0.02)
2189 event.set()
2190 for t in threads:
2191 t.join()
2192 with self.open(support.TESTFN) as f:
2193 content = f.read()
2194 for n in range(20):
2195 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2196
Antoine Pitroufaf90072010-05-03 16:58:19 +00002197 def test_flush_error_on_close(self):
2198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2199 def bad_flush():
2200 raise IOError()
2201 txt.flush = bad_flush
2202 self.assertRaises(IOError, txt.close) # exception not swallowed
2203
2204 def test_multi_close(self):
2205 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2206 txt.close()
2207 txt.close()
2208 txt.close()
2209 self.assertRaises(ValueError, txt.flush)
2210
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211class CTextIOWrapperTest(TextIOWrapperTest):
2212
2213 def test_initialization(self):
2214 r = self.BytesIO(b"\xc3\xa9\n\n")
2215 b = self.BufferedReader(r, 1000)
2216 t = self.TextIOWrapper(b)
2217 self.assertRaises(TypeError, t.__init__, b, newline=42)
2218 self.assertRaises(ValueError, t.read)
2219 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2220 self.assertRaises(ValueError, t.read)
2221
2222 def test_garbage_collection(self):
2223 # C TextIOWrapper objects are collected, and collecting them flushes
2224 # all data to disk.
2225 # The Python version has __del__, so it ends in gc.garbage instead.
2226 rawio = io.FileIO(support.TESTFN, "wb")
2227 b = self.BufferedWriter(rawio)
2228 t = self.TextIOWrapper(b, encoding="ascii")
2229 t.write("456def")
2230 t.x = t
2231 wr = weakref.ref(t)
2232 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002233 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002234 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002235 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236 self.assertEqual(f.read(), b"456def")
2237
2238class PyTextIOWrapperTest(TextIOWrapperTest):
2239 pass
2240
2241
2242class IncrementalNewlineDecoderTest(unittest.TestCase):
2243
2244 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002245 # UTF-8 specific tests for a newline decoder
2246 def _check_decode(b, s, **kwargs):
2247 # We exercise getstate() / setstate() as well as decode()
2248 state = decoder.getstate()
2249 self.assertEquals(decoder.decode(b, **kwargs), s)
2250 decoder.setstate(state)
2251 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002252
Antoine Pitrou180a3362008-12-14 16:36:46 +00002253 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002254
Antoine Pitrou180a3362008-12-14 16:36:46 +00002255 _check_decode(b'\xe8', "")
2256 _check_decode(b'\xa2', "")
2257 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002258
Antoine Pitrou180a3362008-12-14 16:36:46 +00002259 _check_decode(b'\xe8', "")
2260 _check_decode(b'\xa2', "")
2261 _check_decode(b'\x88', "\u8888")
2262
2263 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002264 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2265
Antoine Pitrou180a3362008-12-14 16:36:46 +00002266 decoder.reset()
2267 _check_decode(b'\n', "\n")
2268 _check_decode(b'\r', "")
2269 _check_decode(b'', "\n", final=True)
2270 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002271
Antoine Pitrou180a3362008-12-14 16:36:46 +00002272 _check_decode(b'\r', "")
2273 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002274
Antoine Pitrou180a3362008-12-14 16:36:46 +00002275 _check_decode(b'\r\r\n', "\n\n")
2276 _check_decode(b'\r', "")
2277 _check_decode(b'\r', "\n")
2278 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002279
Antoine Pitrou180a3362008-12-14 16:36:46 +00002280 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2281 _check_decode(b'\xe8\xa2\x88', "\u8888")
2282 _check_decode(b'\n', "\n")
2283 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2284 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002285
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002287 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 if encoding is not None:
2289 encoder = codecs.getincrementalencoder(encoding)()
2290 def _decode_bytewise(s):
2291 # Decode one byte at a time
2292 for b in encoder.encode(s):
2293 result.append(decoder.decode(bytes([b])))
2294 else:
2295 encoder = None
2296 def _decode_bytewise(s):
2297 # Decode one char at a time
2298 for c in s:
2299 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002300 self.assertEquals(decoder.newlines, None)
2301 _decode_bytewise("abc\n\r")
2302 self.assertEquals(decoder.newlines, '\n')
2303 _decode_bytewise("\nabc")
2304 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2305 _decode_bytewise("abc\r")
2306 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2307 _decode_bytewise("abc")
2308 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2309 _decode_bytewise("abc\r")
2310 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2311 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002312 input = "abc"
2313 if encoder is not None:
2314 encoder.reset()
2315 input = encoder.encode(input)
2316 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002317 self.assertEquals(decoder.newlines, None)
2318
2319 def test_newline_decoder(self):
2320 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 # None meaning the IncrementalNewlineDecoder takes unicode input
2322 # rather than bytes input
2323 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002324 'utf-16', 'utf-16-le', 'utf-16-be',
2325 'utf-32', 'utf-32-le', 'utf-32-be',
2326 )
2327 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 decoder = enc and codecs.getincrementaldecoder(enc)()
2329 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2330 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002331 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002332 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2333 self.check_newline_decoding_utf8(decoder)
2334
Antoine Pitrou66913e22009-03-06 23:40:56 +00002335 def test_newline_bytes(self):
2336 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2337 def _check(dec):
2338 self.assertEquals(dec.newlines, None)
2339 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2340 self.assertEquals(dec.newlines, None)
2341 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2342 self.assertEquals(dec.newlines, None)
2343 dec = self.IncrementalNewlineDecoder(None, translate=False)
2344 _check(dec)
2345 dec = self.IncrementalNewlineDecoder(None, translate=True)
2346 _check(dec)
2347
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2349 pass
2350
2351class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2352 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002353
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002354
Guido van Rossum01a27522007-03-07 01:00:12 +00002355# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002356
Guido van Rossum5abbf752007-08-27 17:39:33 +00002357class MiscIOTest(unittest.TestCase):
2358
Barry Warsaw40e82462008-11-20 20:14:50 +00002359 def tearDown(self):
2360 support.unlink(support.TESTFN)
2361
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002362 def test___all__(self):
2363 for name in self.io.__all__:
2364 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002365 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002366 if name == "open":
2367 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002368 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002369 self.assertTrue(issubclass(obj, Exception), name)
2370 elif not name.startswith("SEEK_"):
2371 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002372
Barry Warsaw40e82462008-11-20 20:14:50 +00002373 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002375 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002376 f.close()
2377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002379 self.assertEquals(f.name, support.TESTFN)
2380 self.assertEquals(f.buffer.name, support.TESTFN)
2381 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2382 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002383 self.assertEquals(f.buffer.mode, "rb")
2384 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002385 f.close()
2386
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002387 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002388 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002389 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2390 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002393 self.assertEquals(g.mode, "wb")
2394 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002395 self.assertEquals(g.name, f.fileno())
2396 self.assertEquals(g.raw.name, f.fileno())
2397 f.close()
2398 g.close()
2399
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002400 def test_io_after_close(self):
2401 for kwargs in [
2402 {"mode": "w"},
2403 {"mode": "wb"},
2404 {"mode": "w", "buffering": 1},
2405 {"mode": "w", "buffering": 2},
2406 {"mode": "wb", "buffering": 0},
2407 {"mode": "r"},
2408 {"mode": "rb"},
2409 {"mode": "r", "buffering": 1},
2410 {"mode": "r", "buffering": 2},
2411 {"mode": "rb", "buffering": 0},
2412 {"mode": "w+"},
2413 {"mode": "w+b"},
2414 {"mode": "w+", "buffering": 1},
2415 {"mode": "w+", "buffering": 2},
2416 {"mode": "w+b", "buffering": 0},
2417 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002418 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002419 f.close()
2420 self.assertRaises(ValueError, f.flush)
2421 self.assertRaises(ValueError, f.fileno)
2422 self.assertRaises(ValueError, f.isatty)
2423 self.assertRaises(ValueError, f.__iter__)
2424 if hasattr(f, "peek"):
2425 self.assertRaises(ValueError, f.peek, 1)
2426 self.assertRaises(ValueError, f.read)
2427 if hasattr(f, "read1"):
2428 self.assertRaises(ValueError, f.read1, 1024)
2429 if hasattr(f, "readinto"):
2430 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2431 self.assertRaises(ValueError, f.readline)
2432 self.assertRaises(ValueError, f.readlines)
2433 self.assertRaises(ValueError, f.seek, 0)
2434 self.assertRaises(ValueError, f.tell)
2435 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002436 self.assertRaises(ValueError, f.write,
2437 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002438 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002441 def test_blockingioerror(self):
2442 # Various BlockingIOError issues
2443 self.assertRaises(TypeError, self.BlockingIOError)
2444 self.assertRaises(TypeError, self.BlockingIOError, 1)
2445 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2446 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2447 b = self.BlockingIOError(1, "")
2448 self.assertEqual(b.characters_written, 0)
2449 class C(str):
2450 pass
2451 c = C("")
2452 b = self.BlockingIOError(1, c)
2453 c.b = b
2454 b.c = c
2455 wr = weakref.ref(c)
2456 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002457 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002458 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002459
2460 def test_abcs(self):
2461 # Test the visible base classes are ABCs.
2462 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2463 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2464 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2465 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2466
2467 def _check_abc_inheritance(self, abcmodule):
2468 with self.open(support.TESTFN, "wb", buffering=0) as f:
2469 self.assertTrue(isinstance(f, abcmodule.IOBase))
2470 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2471 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2472 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2473 with self.open(support.TESTFN, "wb") as f:
2474 self.assertTrue(isinstance(f, abcmodule.IOBase))
2475 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2476 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2477 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2478 with self.open(support.TESTFN, "w") as f:
2479 self.assertTrue(isinstance(f, abcmodule.IOBase))
2480 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2481 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2482 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2483
2484 def test_abc_inheritance(self):
2485 # Test implementations inherit from their respective ABCs
2486 self._check_abc_inheritance(self)
2487
2488 def test_abc_inheritance_official(self):
2489 # Test implementations inherit from the official ABCs of the
2490 # baseline "io" module.
2491 self._check_abc_inheritance(io)
2492
2493class CMiscIOTest(MiscIOTest):
2494 io = io
2495
2496class PyMiscIOTest(MiscIOTest):
2497 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002498
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002499
2500@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2501class SignalsTest(unittest.TestCase):
2502
2503 def setUp(self):
2504 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2505
2506 def tearDown(self):
2507 signal.signal(signal.SIGALRM, self.oldalrm)
2508
2509 def alarm_interrupt(self, sig, frame):
2510 1/0
2511
2512 @unittest.skipUnless(threading, 'Threading required for this test.')
2513 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2514 """Check that a partial write, when it gets interrupted, properly
2515 invokes the signal handler."""
2516 read_results = []
2517 def _read():
2518 s = os.read(r, 1)
2519 read_results.append(s)
2520 t = threading.Thread(target=_read)
2521 t.daemon = True
2522 r, w = os.pipe()
2523 try:
2524 wio = self.io.open(w, **fdopen_kwargs)
2525 t.start()
2526 signal.alarm(1)
2527 # Fill the pipe enough that the write will be blocking.
2528 # It will be interrupted by the timer armed above. Since the
2529 # other thread has read one byte, the low-level write will
2530 # return with a successful (partial) result rather than an EINTR.
2531 # The buffered IO layer must check for pending signal
2532 # handlers, which in this case will invoke alarm_interrupt().
2533 self.assertRaises(ZeroDivisionError,
2534 wio.write, item * (1024 * 1024))
2535 t.join()
2536 # We got one byte, get another one and check that it isn't a
2537 # repeat of the first one.
2538 read_results.append(os.read(r, 1))
2539 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2540 finally:
2541 os.close(w)
2542 os.close(r)
2543 # This is deliberate. If we didn't close the file descriptor
2544 # before closing wio, wio would try to flush its internal
2545 # buffer, and block again.
2546 try:
2547 wio.close()
2548 except IOError as e:
2549 if e.errno != errno.EBADF:
2550 raise
2551
2552 def test_interrupted_write_unbuffered(self):
2553 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2554
2555 def test_interrupted_write_buffered(self):
2556 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2557
2558 def test_interrupted_write_text(self):
2559 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2560
2561class CSignalsTest(SignalsTest):
2562 io = io
2563
2564class PySignalsTest(SignalsTest):
2565 io = pyio
2566
2567
Guido van Rossum28524c72007-02-27 05:47:44 +00002568def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 tests = (CIOTest, PyIOTest,
2570 CBufferedReaderTest, PyBufferedReaderTest,
2571 CBufferedWriterTest, PyBufferedWriterTest,
2572 CBufferedRWPairTest, PyBufferedRWPairTest,
2573 CBufferedRandomTest, PyBufferedRandomTest,
2574 StatefulIncrementalDecoderTest,
2575 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2576 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitrou16b11de2010-08-21 19:17:57 +00002577 CMiscIOTest, PyMiscIOTest,
2578 CSignalsTest, PySignalsTest,
2579 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580
2581 # Put the namespaces of the IO module we are testing and some useful mock
2582 # classes in the __dict__ of each test.
2583 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitroue5e75c62010-09-14 18:53:07 +00002584 MockNonBlockWriterIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2586 c_io_ns = {name : getattr(io, name) for name in all_members}
2587 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2588 globs = globals()
2589 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2590 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2591 # Avoid turning open into a bound method.
2592 py_io_ns["open"] = pyio.OpenWrapper
2593 for test in tests:
2594 if test.__name__.startswith("C"):
2595 for name, obj in c_io_ns.items():
2596 setattr(test, name, obj)
2597 elif test.__name__.startswith("Py"):
2598 for name, obj in py_io_ns.items():
2599 setattr(test, name, obj)
2600
2601 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002602
2603if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002604 test_main()