blob: 0ffb4a15ef08409d7490bd44c155a6508facb687 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000049 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
614 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class PyIOTest(IOTest):
617 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000618
Guido van Rossuma9e20242007-03-08 00:43:48 +0000619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620class CommonBufferedTests:
621 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
622
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000623 def test_detach(self):
624 raw = self.MockRawIO()
625 buf = self.tp(raw)
626 self.assertIs(buf.detach(), raw)
627 self.assertRaises(ValueError, buf.detach)
628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 def test_fileno(self):
630 rawio = self.MockRawIO()
631 bufio = self.tp(rawio)
632
Ezio Melottib3aedd42010-11-20 19:04:17 +0000633 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def test_no_fileno(self):
636 # XXX will we always have fileno() function? If so, kill
637 # this test. Else, write it.
638 pass
639
640 def test_invalid_args(self):
641 rawio = self.MockRawIO()
642 bufio = self.tp(rawio)
643 # Invalid whence
644 self.assertRaises(ValueError, bufio.seek, 0, -1)
645 self.assertRaises(ValueError, bufio.seek, 0, 3)
646
647 def test_override_destructor(self):
648 tp = self.tp
649 record = []
650 class MyBufferedIO(tp):
651 def __del__(self):
652 record.append(1)
653 try:
654 f = super().__del__
655 except AttributeError:
656 pass
657 else:
658 f()
659 def close(self):
660 record.append(2)
661 super().close()
662 def flush(self):
663 record.append(3)
664 super().flush()
665 rawio = self.MockRawIO()
666 bufio = MyBufferedIO(rawio)
667 writable = bufio.writable()
668 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000669 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 if writable:
671 self.assertEqual(record, [1, 2, 3])
672 else:
673 self.assertEqual(record, [1, 2])
674
675 def test_context_manager(self):
676 # Test usability as a context manager
677 rawio = self.MockRawIO()
678 bufio = self.tp(rawio)
679 def _with():
680 with bufio:
681 pass
682 _with()
683 # bufio should now be closed, and using it a second time should raise
684 # a ValueError.
685 self.assertRaises(ValueError, _with)
686
687 def test_error_through_destructor(self):
688 # Test that the exception state is not modified by a destructor,
689 # even if close() fails.
690 rawio = self.CloseFailureIO()
691 def f():
692 self.tp(rawio).xyzzy
693 with support.captured_output("stderr") as s:
694 self.assertRaises(AttributeError, f)
695 s = s.getvalue().strip()
696 if s:
697 # The destructor *may* have printed an unraisable error, check it
698 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000699 self.assertTrue(s.startswith("Exception IOError: "), s)
700 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000701
Antoine Pitrou716c4442009-05-23 19:04:03 +0000702 def test_repr(self):
703 raw = self.MockRawIO()
704 b = self.tp(raw)
705 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
706 self.assertEqual(repr(b), "<%s>" % clsname)
707 raw.name = "dummy"
708 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
709 raw.name = b"dummy"
710 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
711
Antoine Pitrou6be88762010-05-03 16:48:20 +0000712 def test_flush_error_on_close(self):
713 raw = self.MockRawIO()
714 def bad_flush():
715 raise IOError()
716 raw.flush = bad_flush
717 b = self.tp(raw)
718 self.assertRaises(IOError, b.close) # exception not swallowed
719
720 def test_multi_close(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 b.close()
724 b.close()
725 b.close()
726 self.assertRaises(ValueError, b.flush)
727
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000728 def test_unseekable(self):
729 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
730 self.assertRaises(self.UnsupportedOperation, bufio.tell)
731 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
732
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000733 def test_readonly_attributes(self):
734 raw = self.MockRawIO()
735 buf = self.tp(raw)
736 x = self.MockRawIO()
737 with self.assertRaises(AttributeError):
738 buf.raw = x
739
Guido van Rossum78892e42007-04-06 17:31:18 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
742 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_constructor(self):
745 rawio = self.MockRawIO([b"abc"])
746 bufio = self.tp(rawio)
747 bufio.__init__(rawio)
748 bufio.__init__(rawio, buffer_size=1024)
749 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
752 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
754 rawio = self.MockRawIO([b"abc"])
755 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000759 for arg in (None, 7):
760 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
761 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000762 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 # Invalid args
764 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766 def test_read1(self):
767 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
768 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000769 self.assertEqual(b"a", bufio.read(1))
770 self.assertEqual(b"b", bufio.read1(1))
771 self.assertEqual(rawio._reads, 1)
772 self.assertEqual(b"c", bufio.read1(100))
773 self.assertEqual(rawio._reads, 1)
774 self.assertEqual(b"d", bufio.read1(100))
775 self.assertEqual(rawio._reads, 2)
776 self.assertEqual(b"efg", bufio.read1(100))
777 self.assertEqual(rawio._reads, 3)
778 self.assertEqual(b"", bufio.read1(100))
779 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 # Invalid args
781 self.assertRaises(ValueError, bufio.read1, -1)
782
783 def test_readinto(self):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
786 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000787 self.assertEqual(bufio.readinto(b), 2)
788 self.assertEqual(b, b"ab")
789 self.assertEqual(bufio.readinto(b), 2)
790 self.assertEqual(b, b"cd")
791 self.assertEqual(bufio.readinto(b), 2)
792 self.assertEqual(b, b"ef")
793 self.assertEqual(bufio.readinto(b), 1)
794 self.assertEqual(b, b"gf")
795 self.assertEqual(bufio.readinto(b), 0)
796 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200797 rawio = self.MockRawIO((b"abc", None))
798 bufio = self.tp(rawio)
799 self.assertEqual(bufio.readinto(b), 2)
800 self.assertEqual(b, b"ab")
801 self.assertEqual(bufio.readinto(b), 1)
802 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000804 def test_readlines(self):
805 def bufio():
806 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
807 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000808 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
809 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
810 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000812 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000813 data = b"abcdefghi"
814 dlen = len(data)
815
816 tests = [
817 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
818 [ 100, [ 3, 3, 3], [ dlen ] ],
819 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
820 ]
821
822 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 rawio = self.MockFileIO(data)
824 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000825 pos = 0
826 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000827 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000828 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000832 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000833 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
835 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000836 self.assertEqual(b"abcd", bufio.read(6))
837 self.assertEqual(b"e", bufio.read(1))
838 self.assertEqual(b"fg", bufio.read())
839 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200840 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000841 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000842
Victor Stinnera80987f2011-05-25 22:47:16 +0200843 rawio = self.MockRawIO((b"a", None, None))
844 self.assertEqual(b"a", rawio.readall())
845 self.assertIsNone(rawio.readall())
846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 def test_read_past_eof(self):
848 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
849 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000850
Ezio Melottib3aedd42010-11-20 19:04:17 +0000851 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000852
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 def test_read_all(self):
854 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
855 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000856
Ezio Melottib3aedd42010-11-20 19:04:17 +0000857 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000858
Victor Stinner45df8202010-04-28 22:31:17 +0000859 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000860 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000861 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000862 try:
863 # Write out many bytes with exactly the same number of 0's,
864 # 1's... 255's. This will help us check that concurrent reading
865 # doesn't duplicate or forget contents.
866 N = 1000
867 l = list(range(256)) * N
868 random.shuffle(l)
869 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000870 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000871 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000872 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000874 errors = []
875 results = []
876 def f():
877 try:
878 # Intra-buffer read then buffer-flushing read
879 for n in cycle([1, 19]):
880 s = bufio.read(n)
881 if not s:
882 break
883 # list.append() is atomic
884 results.append(s)
885 except Exception as e:
886 errors.append(e)
887 raise
888 threads = [threading.Thread(target=f) for x in range(20)]
889 for t in threads:
890 t.start()
891 time.sleep(0.02) # yield
892 for t in threads:
893 t.join()
894 self.assertFalse(errors,
895 "the following exceptions were caught: %r" % errors)
896 s = b''.join(results)
897 for i in range(256):
898 c = bytes(bytearray([i]))
899 self.assertEqual(s.count(c), N)
900 finally:
901 support.unlink(support.TESTFN)
902
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903 def test_misbehaved_io(self):
904 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
905 bufio = self.tp(rawio)
906 self.assertRaises(IOError, bufio.seek, 0)
907 self.assertRaises(IOError, bufio.tell)
908
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000909 def test_no_extraneous_read(self):
910 # Issue #9550; when the raw IO object has satisfied the read request,
911 # we should not issue any additional reads, otherwise it may block
912 # (e.g. socket).
913 bufsize = 16
914 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
915 rawio = self.MockRawIO([b"x" * n])
916 bufio = self.tp(rawio, bufsize)
917 self.assertEqual(bufio.read(n), b"x" * n)
918 # Simple case: one raw read is enough to satisfy the request.
919 self.assertEqual(rawio._extraneous_reads, 0,
920 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
921 # A more complex case where two raw reads are needed to satisfy
922 # the request.
923 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
924 bufio = self.tp(rawio, bufsize)
925 self.assertEqual(bufio.read(n), b"x" * n)
926 self.assertEqual(rawio._extraneous_reads, 0,
927 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
928
929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930class CBufferedReaderTest(BufferedReaderTest):
931 tp = io.BufferedReader
932
933 def test_constructor(self):
934 BufferedReaderTest.test_constructor(self)
935 # The allocation can succeed on 32-bit builds, e.g. with more
936 # than 2GB RAM and a 64-bit kernel.
937 if sys.maxsize > 0x7FFFFFFF:
938 rawio = self.MockRawIO()
939 bufio = self.tp(rawio)
940 self.assertRaises((OverflowError, MemoryError, ValueError),
941 bufio.__init__, rawio, sys.maxsize)
942
943 def test_initialization(self):
944 rawio = self.MockRawIO([b"abc"])
945 bufio = self.tp(rawio)
946 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
947 self.assertRaises(ValueError, bufio.read)
948 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
949 self.assertRaises(ValueError, bufio.read)
950 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
951 self.assertRaises(ValueError, bufio.read)
952
953 def test_misbehaved_io_read(self):
954 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
955 bufio = self.tp(rawio)
956 # _pyio.BufferedReader seems to implement reading different, so that
957 # checking this is not so easy.
958 self.assertRaises(IOError, bufio.read, 10)
959
960 def test_garbage_collection(self):
961 # C BufferedReader objects are collected.
962 # The Python version has __del__, so it ends into gc.garbage instead
963 rawio = self.FileIO(support.TESTFN, "w+b")
964 f = self.tp(rawio)
965 f.f = f
966 wr = weakref.ref(f)
967 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000968 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000969 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000970
971class PyBufferedReaderTest(BufferedReaderTest):
972 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000973
Guido van Rossuma9e20242007-03-08 00:43:48 +0000974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
976 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000978 def test_constructor(self):
979 rawio = self.MockRawIO()
980 bufio = self.tp(rawio)
981 bufio.__init__(rawio)
982 bufio.__init__(rawio, buffer_size=1024)
983 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000984 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 bufio.flush()
986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
987 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
988 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
989 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000990 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000992 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000994 def test_detach_flush(self):
995 raw = self.MockRawIO()
996 buf = self.tp(raw)
997 buf.write(b"howdy!")
998 self.assertFalse(raw._write_stack)
999 buf.detach()
1000 self.assertEqual(raw._write_stack, [b"howdy!"])
1001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001003 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001004 writer = self.MockRawIO()
1005 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001006 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001007 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001009 def test_write_overflow(self):
1010 writer = self.MockRawIO()
1011 bufio = self.tp(writer, 8)
1012 contents = b"abcdefghijklmnop"
1013 for n in range(0, len(contents), 3):
1014 bufio.write(contents[n:n+3])
1015 flushed = b"".join(writer._write_stack)
1016 # At least (total - 8) bytes were implicitly flushed, perhaps more
1017 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001018 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020 def check_writes(self, intermediate_func):
1021 # Lots of writes, test the flushed output is as expected.
1022 contents = bytes(range(256)) * 1000
1023 n = 0
1024 writer = self.MockRawIO()
1025 bufio = self.tp(writer, 13)
1026 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1027 def gen_sizes():
1028 for size in count(1):
1029 for i in range(15):
1030 yield size
1031 sizes = gen_sizes()
1032 while n < len(contents):
1033 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001034 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 intermediate_func(bufio)
1036 n += size
1037 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001038 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_writes(self):
1041 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 def test_writes_and_flushes(self):
1044 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046 def test_writes_and_seeks(self):
1047 def _seekabs(bufio):
1048 pos = bufio.tell()
1049 bufio.seek(pos + 1, 0)
1050 bufio.seek(pos - 1, 0)
1051 bufio.seek(pos, 0)
1052 self.check_writes(_seekabs)
1053 def _seekrel(bufio):
1054 pos = bufio.seek(0, 1)
1055 bufio.seek(+1, 1)
1056 bufio.seek(-1, 1)
1057 bufio.seek(pos, 0)
1058 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_writes_and_truncates(self):
1061 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_write_non_blocking(self):
1064 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001065 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001066
Ezio Melottib3aedd42010-11-20 19:04:17 +00001067 self.assertEqual(bufio.write(b"abcd"), 4)
1068 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 # 1 byte will be written, the rest will be buffered
1070 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001071 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1074 raw.block_on(b"0")
1075 try:
1076 bufio.write(b"opqrwxyz0123456789")
1077 except self.BlockingIOError as e:
1078 written = e.characters_written
1079 else:
1080 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(written, 16)
1082 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001084
Ezio Melottib3aedd42010-11-20 19:04:17 +00001085 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086 s = raw.pop_written()
1087 # Previously buffered bytes were flushed
1088 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 def test_write_and_rewind(self):
1091 raw = io.BytesIO()
1092 bufio = self.tp(raw, 4)
1093 self.assertEqual(bufio.write(b"abcdef"), 6)
1094 self.assertEqual(bufio.tell(), 6)
1095 bufio.seek(0, 0)
1096 self.assertEqual(bufio.write(b"XY"), 2)
1097 bufio.seek(6, 0)
1098 self.assertEqual(raw.getvalue(), b"XYcdef")
1099 self.assertEqual(bufio.write(b"123456"), 6)
1100 bufio.flush()
1101 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 def test_flush(self):
1104 writer = self.MockRawIO()
1105 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001106 bufio.write(b"abc")
1107 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001108 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 def test_destructor(self):
1111 writer = self.MockRawIO()
1112 bufio = self.tp(writer, 8)
1113 bufio.write(b"abc")
1114 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001115 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001116 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117
1118 def test_truncate(self):
1119 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001120 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 bufio = self.tp(raw, 8)
1122 bufio.write(b"abcdef")
1123 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001124 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001125 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 self.assertEqual(f.read(), b"abc")
1127
Victor Stinner45df8202010-04-28 22:31:17 +00001128 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001129 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001130 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001131 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 # Write out many bytes from many threads and test they were
1133 # all flushed.
1134 N = 1000
1135 contents = bytes(range(256)) * N
1136 sizes = cycle([1, 19])
1137 n = 0
1138 queue = deque()
1139 while n < len(contents):
1140 size = next(sizes)
1141 queue.append(contents[n:n+size])
1142 n += size
1143 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001144 # We use a real file object because it allows us to
1145 # exercise situations where the GIL is released before
1146 # writing the buffer to the raw streams. This is in addition
1147 # to concurrency issues due to switching threads in the middle
1148 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001149 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001151 errors = []
1152 def f():
1153 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 while True:
1155 try:
1156 s = queue.popleft()
1157 except IndexError:
1158 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001159 bufio.write(s)
1160 except Exception as e:
1161 errors.append(e)
1162 raise
1163 threads = [threading.Thread(target=f) for x in range(20)]
1164 for t in threads:
1165 t.start()
1166 time.sleep(0.02) # yield
1167 for t in threads:
1168 t.join()
1169 self.assertFalse(errors,
1170 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001172 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 s = f.read()
1174 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001176 finally:
1177 support.unlink(support.TESTFN)
1178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 def test_misbehaved_io(self):
1180 rawio = self.MisbehavedRawIO()
1181 bufio = self.tp(rawio, 5)
1182 self.assertRaises(IOError, bufio.seek, 0)
1183 self.assertRaises(IOError, bufio.tell)
1184 self.assertRaises(IOError, bufio.write, b"abcdef")
1185
Benjamin Peterson59406a92009-03-26 17:10:29 +00001186 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001187 with support.check_warnings(("max_buffer_size is deprecated",
1188 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001189 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001190
1191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001192class CBufferedWriterTest(BufferedWriterTest):
1193 tp = io.BufferedWriter
1194
1195 def test_constructor(self):
1196 BufferedWriterTest.test_constructor(self)
1197 # The allocation can succeed on 32-bit builds, e.g. with more
1198 # than 2GB RAM and a 64-bit kernel.
1199 if sys.maxsize > 0x7FFFFFFF:
1200 rawio = self.MockRawIO()
1201 bufio = self.tp(rawio)
1202 self.assertRaises((OverflowError, MemoryError, ValueError),
1203 bufio.__init__, rawio, sys.maxsize)
1204
1205 def test_initialization(self):
1206 rawio = self.MockRawIO()
1207 bufio = self.tp(rawio)
1208 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1209 self.assertRaises(ValueError, bufio.write, b"def")
1210 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1211 self.assertRaises(ValueError, bufio.write, b"def")
1212 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1213 self.assertRaises(ValueError, bufio.write, b"def")
1214
1215 def test_garbage_collection(self):
1216 # C BufferedWriter objects are collected, and collecting them flushes
1217 # all data to disk.
1218 # The Python version has __del__, so it ends into gc.garbage instead
1219 rawio = self.FileIO(support.TESTFN, "w+b")
1220 f = self.tp(rawio)
1221 f.write(b"123xxx")
1222 f.x = f
1223 wr = weakref.ref(f)
1224 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001225 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001226 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001227 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228 self.assertEqual(f.read(), b"123xxx")
1229
1230
1231class PyBufferedWriterTest(BufferedWriterTest):
1232 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001233
Guido van Rossum01a27522007-03-07 01:00:12 +00001234class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001235
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001236 def test_constructor(self):
1237 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001238 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001239
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001240 def test_detach(self):
1241 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1242 self.assertRaises(self.UnsupportedOperation, pair.detach)
1243
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001244 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001245 with support.check_warnings(("max_buffer_size is deprecated",
1246 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001247 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001248
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001249 def test_constructor_with_not_readable(self):
1250 class NotReadable(MockRawIO):
1251 def readable(self):
1252 return False
1253
1254 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1255
1256 def test_constructor_with_not_writeable(self):
1257 class NotWriteable(MockRawIO):
1258 def writable(self):
1259 return False
1260
1261 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1262
1263 def test_read(self):
1264 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1265
1266 self.assertEqual(pair.read(3), b"abc")
1267 self.assertEqual(pair.read(1), b"d")
1268 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001269 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1270 self.assertEqual(pair.read(None), b"abc")
1271
1272 def test_readlines(self):
1273 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1274 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1275 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1276 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001277
1278 def test_read1(self):
1279 # .read1() is delegated to the underlying reader object, so this test
1280 # can be shallow.
1281 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1282
1283 self.assertEqual(pair.read1(3), b"abc")
1284
1285 def test_readinto(self):
1286 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1287
1288 data = bytearray(5)
1289 self.assertEqual(pair.readinto(data), 5)
1290 self.assertEqual(data, b"abcde")
1291
1292 def test_write(self):
1293 w = self.MockRawIO()
1294 pair = self.tp(self.MockRawIO(), w)
1295
1296 pair.write(b"abc")
1297 pair.flush()
1298 pair.write(b"def")
1299 pair.flush()
1300 self.assertEqual(w._write_stack, [b"abc", b"def"])
1301
1302 def test_peek(self):
1303 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1304
1305 self.assertTrue(pair.peek(3).startswith(b"abc"))
1306 self.assertEqual(pair.read(3), b"abc")
1307
1308 def test_readable(self):
1309 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1310 self.assertTrue(pair.readable())
1311
1312 def test_writeable(self):
1313 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1314 self.assertTrue(pair.writable())
1315
1316 def test_seekable(self):
1317 # BufferedRWPairs are never seekable, even if their readers and writers
1318 # are.
1319 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1320 self.assertFalse(pair.seekable())
1321
1322 # .flush() is delegated to the underlying writer object and has been
1323 # tested in the test_write method.
1324
1325 def test_close_and_closed(self):
1326 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1327 self.assertFalse(pair.closed)
1328 pair.close()
1329 self.assertTrue(pair.closed)
1330
1331 def test_isatty(self):
1332 class SelectableIsAtty(MockRawIO):
1333 def __init__(self, isatty):
1334 MockRawIO.__init__(self)
1335 self._isatty = isatty
1336
1337 def isatty(self):
1338 return self._isatty
1339
1340 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1341 self.assertFalse(pair.isatty())
1342
1343 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1344 self.assertTrue(pair.isatty())
1345
1346 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1347 self.assertTrue(pair.isatty())
1348
1349 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1350 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001351
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001352class CBufferedRWPairTest(BufferedRWPairTest):
1353 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001354
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355class PyBufferedRWPairTest(BufferedRWPairTest):
1356 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001357
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001358
1359class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1360 read_mode = "rb+"
1361 write_mode = "wb+"
1362
1363 def test_constructor(self):
1364 BufferedReaderTest.test_constructor(self)
1365 BufferedWriterTest.test_constructor(self)
1366
1367 def test_read_and_write(self):
1368 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001369 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001370
1371 self.assertEqual(b"as", rw.read(2))
1372 rw.write(b"ddd")
1373 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001374 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001376 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001377
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001378 def test_seek_and_tell(self):
1379 raw = self.BytesIO(b"asdfghjkl")
1380 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001381
Ezio Melottib3aedd42010-11-20 19:04:17 +00001382 self.assertEqual(b"as", rw.read(2))
1383 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001384 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001385 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001386
1387 rw.write(b"asdf")
1388 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001389 self.assertEqual(b"asdfasdfl", rw.read())
1390 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001391 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001392 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001393 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001394 self.assertEqual(7, rw.tell())
1395 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001396 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001397
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001398 def check_flush_and_read(self, read_func):
1399 raw = self.BytesIO(b"abcdefghi")
1400 bufio = self.tp(raw)
1401
Ezio Melottib3aedd42010-11-20 19:04:17 +00001402 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001404 self.assertEqual(b"ef", read_func(bufio, 2))
1405 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001406 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001407 self.assertEqual(6, bufio.tell())
1408 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 raw.seek(0, 0)
1410 raw.write(b"XYZ")
1411 # flush() resets the read buffer
1412 bufio.flush()
1413 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001414 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001415
1416 def test_flush_and_read(self):
1417 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1418
1419 def test_flush_and_readinto(self):
1420 def _readinto(bufio, n=-1):
1421 b = bytearray(n if n >= 0 else 9999)
1422 n = bufio.readinto(b)
1423 return bytes(b[:n])
1424 self.check_flush_and_read(_readinto)
1425
1426 def test_flush_and_peek(self):
1427 def _peek(bufio, n=-1):
1428 # This relies on the fact that the buffer can contain the whole
1429 # raw stream, otherwise peek() can return less.
1430 b = bufio.peek(n)
1431 if n != -1:
1432 b = b[:n]
1433 bufio.seek(len(b), 1)
1434 return b
1435 self.check_flush_and_read(_peek)
1436
1437 def test_flush_and_write(self):
1438 raw = self.BytesIO(b"abcdefghi")
1439 bufio = self.tp(raw)
1440
1441 bufio.write(b"123")
1442 bufio.flush()
1443 bufio.write(b"45")
1444 bufio.flush()
1445 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001446 self.assertEqual(b"12345fghi", raw.getvalue())
1447 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448
1449 def test_threads(self):
1450 BufferedReaderTest.test_threads(self)
1451 BufferedWriterTest.test_threads(self)
1452
1453 def test_writes_and_peek(self):
1454 def _peek(bufio):
1455 bufio.peek(1)
1456 self.check_writes(_peek)
1457 def _peek(bufio):
1458 pos = bufio.tell()
1459 bufio.seek(-1, 1)
1460 bufio.peek(1)
1461 bufio.seek(pos, 0)
1462 self.check_writes(_peek)
1463
1464 def test_writes_and_reads(self):
1465 def _read(bufio):
1466 bufio.seek(-1, 1)
1467 bufio.read(1)
1468 self.check_writes(_read)
1469
1470 def test_writes_and_read1s(self):
1471 def _read1(bufio):
1472 bufio.seek(-1, 1)
1473 bufio.read1(1)
1474 self.check_writes(_read1)
1475
1476 def test_writes_and_readintos(self):
1477 def _read(bufio):
1478 bufio.seek(-1, 1)
1479 bufio.readinto(bytearray(1))
1480 self.check_writes(_read)
1481
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001482 def test_write_after_readahead(self):
1483 # Issue #6629: writing after the buffer was filled by readahead should
1484 # first rewind the raw stream.
1485 for overwrite_size in [1, 5]:
1486 raw = self.BytesIO(b"A" * 10)
1487 bufio = self.tp(raw, 4)
1488 # Trigger readahead
1489 self.assertEqual(bufio.read(1), b"A")
1490 self.assertEqual(bufio.tell(), 1)
1491 # Overwriting should rewind the raw stream if it needs so
1492 bufio.write(b"B" * overwrite_size)
1493 self.assertEqual(bufio.tell(), overwrite_size + 1)
1494 # If the write size was smaller than the buffer size, flush() and
1495 # check that rewind happens.
1496 bufio.flush()
1497 self.assertEqual(bufio.tell(), overwrite_size + 1)
1498 s = raw.getvalue()
1499 self.assertEqual(s,
1500 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1501
Antoine Pitrou7c404892011-05-13 00:13:33 +02001502 def test_write_rewind_write(self):
1503 # Various combinations of reading / writing / seeking backwards / writing again
1504 def mutate(bufio, pos1, pos2):
1505 assert pos2 >= pos1
1506 # Fill the buffer
1507 bufio.seek(pos1)
1508 bufio.read(pos2 - pos1)
1509 bufio.write(b'\x02')
1510 # This writes earlier than the previous write, but still inside
1511 # the buffer.
1512 bufio.seek(pos1)
1513 bufio.write(b'\x01')
1514
1515 b = b"\x80\x81\x82\x83\x84"
1516 for i in range(0, len(b)):
1517 for j in range(i, len(b)):
1518 raw = self.BytesIO(b)
1519 bufio = self.tp(raw, 100)
1520 mutate(bufio, i, j)
1521 bufio.flush()
1522 expected = bytearray(b)
1523 expected[j] = 2
1524 expected[i] = 1
1525 self.assertEqual(raw.getvalue(), expected,
1526 "failed result for i=%d, j=%d" % (i, j))
1527
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001528 def test_truncate_after_read_or_write(self):
1529 raw = self.BytesIO(b"A" * 10)
1530 bufio = self.tp(raw, 100)
1531 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1532 self.assertEqual(bufio.truncate(), 2)
1533 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1534 self.assertEqual(bufio.truncate(), 4)
1535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def test_misbehaved_io(self):
1537 BufferedReaderTest.test_misbehaved_io(self)
1538 BufferedWriterTest.test_misbehaved_io(self)
1539
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001540 # You can't construct a BufferedRandom over a non-seekable stream.
1541 test_unseekable = None
1542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543class CBufferedRandomTest(BufferedRandomTest):
1544 tp = io.BufferedRandom
1545
1546 def test_constructor(self):
1547 BufferedRandomTest.test_constructor(self)
1548 # The allocation can succeed on 32-bit builds, e.g. with more
1549 # than 2GB RAM and a 64-bit kernel.
1550 if sys.maxsize > 0x7FFFFFFF:
1551 rawio = self.MockRawIO()
1552 bufio = self.tp(rawio)
1553 self.assertRaises((OverflowError, MemoryError, ValueError),
1554 bufio.__init__, rawio, sys.maxsize)
1555
1556 def test_garbage_collection(self):
1557 CBufferedReaderTest.test_garbage_collection(self)
1558 CBufferedWriterTest.test_garbage_collection(self)
1559
1560class PyBufferedRandomTest(BufferedRandomTest):
1561 tp = pyio.BufferedRandom
1562
1563
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001564# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1565# properties:
1566# - A single output character can correspond to many bytes of input.
1567# - The number of input bytes to complete the character can be
1568# undetermined until the last input byte is received.
1569# - The number of input bytes can vary depending on previous input.
1570# - A single input byte can correspond to many characters of output.
1571# - The number of output characters can be undetermined until the
1572# last input byte is received.
1573# - The number of output characters can vary depending on previous input.
1574
1575class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1576 """
1577 For testing seek/tell behavior with a stateful, buffering decoder.
1578
1579 Input is a sequence of words. Words may be fixed-length (length set
1580 by input) or variable-length (period-terminated). In variable-length
1581 mode, extra periods are ignored. Possible words are:
1582 - 'i' followed by a number sets the input length, I (maximum 99).
1583 When I is set to 0, words are space-terminated.
1584 - 'o' followed by a number sets the output length, O (maximum 99).
1585 - Any other word is converted into a word followed by a period on
1586 the output. The output word consists of the input word truncated
1587 or padded out with hyphens to make its length equal to O. If O
1588 is 0, the word is output verbatim without truncating or padding.
1589 I and O are initially set to 1. When I changes, any buffered input is
1590 re-scanned according to the new I. EOF also terminates the last word.
1591 """
1592
1593 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001594 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001595 self.reset()
1596
1597 def __repr__(self):
1598 return '<SID %x>' % id(self)
1599
1600 def reset(self):
1601 self.i = 1
1602 self.o = 1
1603 self.buffer = bytearray()
1604
1605 def getstate(self):
1606 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1607 return bytes(self.buffer), i*100 + o
1608
1609 def setstate(self, state):
1610 buffer, io = state
1611 self.buffer = bytearray(buffer)
1612 i, o = divmod(io, 100)
1613 self.i, self.o = i ^ 1, o ^ 1
1614
1615 def decode(self, input, final=False):
1616 output = ''
1617 for b in input:
1618 if self.i == 0: # variable-length, terminated with period
1619 if b == ord('.'):
1620 if self.buffer:
1621 output += self.process_word()
1622 else:
1623 self.buffer.append(b)
1624 else: # fixed-length, terminate after self.i bytes
1625 self.buffer.append(b)
1626 if len(self.buffer) == self.i:
1627 output += self.process_word()
1628 if final and self.buffer: # EOF terminates the last word
1629 output += self.process_word()
1630 return output
1631
1632 def process_word(self):
1633 output = ''
1634 if self.buffer[0] == ord('i'):
1635 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1636 elif self.buffer[0] == ord('o'):
1637 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1638 else:
1639 output = self.buffer.decode('ascii')
1640 if len(output) < self.o:
1641 output += '-'*self.o # pad out with hyphens
1642 if self.o:
1643 output = output[:self.o] # truncate to output length
1644 output += '.'
1645 self.buffer = bytearray()
1646 return output
1647
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001648 codecEnabled = False
1649
1650 @classmethod
1651 def lookupTestDecoder(cls, name):
1652 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001653 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001654 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001655 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001656 incrementalencoder=None,
1657 streamreader=None, streamwriter=None,
1658 incrementaldecoder=cls)
1659
1660# Register the previous decoder for testing.
1661# Disabled by default, tests will enable it.
1662codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1663
1664
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001665class StatefulIncrementalDecoderTest(unittest.TestCase):
1666 """
1667 Make sure the StatefulIncrementalDecoder actually works.
1668 """
1669
1670 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001671 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001672 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001673 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001674 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001675 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001676 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001677 # I=0, O=6 (variable-length input, fixed-length output)
1678 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1679 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001680 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001681 # I=6, O=3 (fixed-length input > fixed-length output)
1682 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1683 # I=0, then 3; O=29, then 15 (with longer output)
1684 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1685 'a----------------------------.' +
1686 'b----------------------------.' +
1687 'cde--------------------------.' +
1688 'abcdefghijabcde.' +
1689 'a.b------------.' +
1690 '.c.------------.' +
1691 'd.e------------.' +
1692 'k--------------.' +
1693 'l--------------.' +
1694 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001695 ]
1696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001698 # Try a few one-shot test cases.
1699 for input, eof, output in self.test_cases:
1700 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001702
1703 # Also test an unfinished decode, followed by forcing EOF.
1704 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(d.decode(b'oiabcd'), '')
1706 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001707
1708class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001709
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001710 def setUp(self):
1711 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1712 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001713 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001714
Guido van Rossumd0712812007-04-11 16:32:43 +00001715 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001716 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718 def test_constructor(self):
1719 r = self.BytesIO(b"\xc3\xa9\n\n")
1720 b = self.BufferedReader(r, 1000)
1721 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001722 t.__init__(b, encoding="latin-1", newline="\r\n")
1723 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001724 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001725 t.__init__(b, encoding="utf-8", line_buffering=True)
1726 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001727 self.assertEqual(t.line_buffering, True)
1728 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729 self.assertRaises(TypeError, t.__init__, b, newline=42)
1730 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1731
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001732 def test_detach(self):
1733 r = self.BytesIO()
1734 b = self.BufferedWriter(r)
1735 t = self.TextIOWrapper(b)
1736 self.assertIs(t.detach(), b)
1737
1738 t = self.TextIOWrapper(b, encoding="ascii")
1739 t.write("howdy")
1740 self.assertFalse(r.getvalue())
1741 t.detach()
1742 self.assertEqual(r.getvalue(), b"howdy")
1743 self.assertRaises(ValueError, t.detach)
1744
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001745 def test_repr(self):
1746 raw = self.BytesIO("hello".encode("utf-8"))
1747 b = self.BufferedReader(raw)
1748 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001749 modname = self.TextIOWrapper.__module__
1750 self.assertEqual(repr(t),
1751 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1752 raw.name = "dummy"
1753 self.assertEqual(repr(t),
1754 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001755 t.mode = "r"
1756 self.assertEqual(repr(t),
1757 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001758 raw.name = b"dummy"
1759 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001760 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 def test_line_buffering(self):
1763 r = self.BytesIO()
1764 b = self.BufferedWriter(r, 1000)
1765 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001766 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001767 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001768 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001769 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001770 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001771 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def test_encoding(self):
1774 # Check the encoding attribute is always set, and valid
1775 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001776 t = self.TextIOWrapper(b, encoding="utf-8")
1777 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001778 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001779 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 codecs.lookup(t.encoding)
1781
1782 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001783 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 b = self.BytesIO(b"abc\n\xff\n")
1785 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001786 self.assertRaises(UnicodeError, t.read)
1787 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 b = self.BytesIO(b"abc\n\xff\n")
1789 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001790 self.assertRaises(UnicodeError, t.read)
1791 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001792 b = self.BytesIO(b"abc\n\xff\n")
1793 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001794 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001795 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 b = self.BytesIO(b"abc\n\xff\n")
1797 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001798 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001801 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 b = self.BytesIO()
1803 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001804 self.assertRaises(UnicodeError, t.write, "\xff")
1805 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 b = self.BytesIO()
1807 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001808 self.assertRaises(UnicodeError, t.write, "\xff")
1809 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001810 b = self.BytesIO()
1811 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001812 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001813 t.write("abc\xffdef\n")
1814 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001815 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001816 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001817 b = self.BytesIO()
1818 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001819 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001820 t.write("abc\xffdef\n")
1821 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001822 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001825 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1826
1827 tests = [
1828 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001829 [ '', input_lines ],
1830 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1831 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1832 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001833 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001834 encodings = (
1835 'utf-8', 'latin-1',
1836 'utf-16', 'utf-16-le', 'utf-16-be',
1837 'utf-32', 'utf-32-le', 'utf-32-be',
1838 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001839
Guido van Rossum8358db22007-08-18 21:39:55 +00001840 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001841 # character in TextIOWrapper._pending_line.
1842 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001843 # XXX: str.encode() should return bytes
1844 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001845 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001846 for bufsize in range(1, 10):
1847 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001848 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1849 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001850 encoding=encoding)
1851 if do_reads:
1852 got_lines = []
1853 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001854 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001855 if c2 == '':
1856 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001857 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001858 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001859 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001860 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001861
1862 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001863 self.assertEqual(got_line, exp_line)
1864 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 def test_newlines_input(self):
1867 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001868 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1869 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001870 (None, normalized.decode("ascii").splitlines(True)),
1871 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1873 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1874 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001875 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 buf = self.BytesIO(testdata)
1877 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001878 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001879 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001880 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 def test_newlines_output(self):
1883 testdict = {
1884 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1885 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1886 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1887 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1888 }
1889 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1890 for newline, expected in tests:
1891 buf = self.BytesIO()
1892 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1893 txt.write("AAA\nB")
1894 txt.write("BB\nCCC\n")
1895 txt.write("X\rY\r\nZ")
1896 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001897 self.assertEqual(buf.closed, False)
1898 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001899
1900 def test_destructor(self):
1901 l = []
1902 base = self.BytesIO
1903 class MyBytesIO(base):
1904 def close(self):
1905 l.append(self.getvalue())
1906 base.close(self)
1907 b = MyBytesIO()
1908 t = self.TextIOWrapper(b, encoding="ascii")
1909 t.write("abc")
1910 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001911 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001912 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913
1914 def test_override_destructor(self):
1915 record = []
1916 class MyTextIO(self.TextIOWrapper):
1917 def __del__(self):
1918 record.append(1)
1919 try:
1920 f = super().__del__
1921 except AttributeError:
1922 pass
1923 else:
1924 f()
1925 def close(self):
1926 record.append(2)
1927 super().close()
1928 def flush(self):
1929 record.append(3)
1930 super().flush()
1931 b = self.BytesIO()
1932 t = MyTextIO(b, encoding="ascii")
1933 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001934 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001935 self.assertEqual(record, [1, 2, 3])
1936
1937 def test_error_through_destructor(self):
1938 # Test that the exception state is not modified by a destructor,
1939 # even if close() fails.
1940 rawio = self.CloseFailureIO()
1941 def f():
1942 self.TextIOWrapper(rawio).xyzzy
1943 with support.captured_output("stderr") as s:
1944 self.assertRaises(AttributeError, f)
1945 s = s.getvalue().strip()
1946 if s:
1947 # The destructor *may* have printed an unraisable error, check it
1948 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001949 self.assertTrue(s.startswith("Exception IOError: "), s)
1950 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001951
Guido van Rossum9b76da62007-04-11 01:09:03 +00001952 # Systematic tests of the text I/O API
1953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001954 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001955 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001956 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001958 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001959 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001960 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001962 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(f.tell(), 0)
1964 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001965 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001966 self.assertEqual(f.seek(0), 0)
1967 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001968 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001969 self.assertEqual(f.read(2), "ab")
1970 self.assertEqual(f.read(1), "c")
1971 self.assertEqual(f.read(1), "")
1972 self.assertEqual(f.read(), "")
1973 self.assertEqual(f.tell(), cookie)
1974 self.assertEqual(f.seek(0), 0)
1975 self.assertEqual(f.seek(0, 2), cookie)
1976 self.assertEqual(f.write("def"), 3)
1977 self.assertEqual(f.seek(cookie), cookie)
1978 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001979 if enc.startswith("utf"):
1980 self.multi_line_test(f, enc)
1981 f.close()
1982
1983 def multi_line_test(self, f, enc):
1984 f.seek(0)
1985 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001986 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001987 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001988 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001989 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001990 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001991 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001992 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001993 wlines.append((f.tell(), line))
1994 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001995 f.seek(0)
1996 rlines = []
1997 while True:
1998 pos = f.tell()
1999 line = f.readline()
2000 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002001 break
2002 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002003 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002004
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002006 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002007 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002008 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002009 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002010 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002011 p2 = f.tell()
2012 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(f.tell(), p0)
2014 self.assertEqual(f.readline(), "\xff\n")
2015 self.assertEqual(f.tell(), p1)
2016 self.assertEqual(f.readline(), "\xff\n")
2017 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002018 f.seek(0)
2019 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002021 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002022 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002023 f.close()
2024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 def test_seeking(self):
2026 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002027 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002028 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002029 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002030 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002031 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002032 suffix = bytes(u_suffix.encode("utf-8"))
2033 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002034 with self.open(support.TESTFN, "wb") as f:
2035 f.write(line*2)
2036 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2037 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002038 self.assertEqual(s, str(prefix, "ascii"))
2039 self.assertEqual(f.tell(), prefix_size)
2040 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002042 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002043 # Regression test for a specific bug
2044 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002045 with self.open(support.TESTFN, "wb") as f:
2046 f.write(data)
2047 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2048 f._CHUNK_SIZE # Just test that it exists
2049 f._CHUNK_SIZE = 2
2050 f.readline()
2051 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002053 def test_seek_and_tell(self):
2054 #Test seek/tell using the StatefulIncrementalDecoder.
2055 # Make test faster by doing smaller seeks
2056 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002057
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002058 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002059 """Tell/seek to various points within a data stream and ensure
2060 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002061 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002062 f.write(data)
2063 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064 f = self.open(support.TESTFN, encoding='test_decoder')
2065 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002066 decoded = f.read()
2067 f.close()
2068
Neal Norwitze2b07052008-03-18 19:52:05 +00002069 for i in range(min_pos, len(decoded) + 1): # seek positions
2070 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002071 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002073 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002074 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002075 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002076 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002077 f.close()
2078
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002079 # Enable the test decoder.
2080 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002081
2082 # Run the tests.
2083 try:
2084 # Try each test case.
2085 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002086 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002087
2088 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002089 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2090 offset = CHUNK_SIZE - len(input)//2
2091 prefix = b'.'*offset
2092 # Don't bother seeking into the prefix (takes too long).
2093 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002094 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002095
2096 # Ensure our test decoder won't interfere with subsequent tests.
2097 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002098 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002099
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002101 data = "1234567890"
2102 tests = ("utf-16",
2103 "utf-16-le",
2104 "utf-16-be",
2105 "utf-32",
2106 "utf-32-le",
2107 "utf-32-be")
2108 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 buf = self.BytesIO()
2110 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002111 # Check if the BOM is written only once (see issue1753).
2112 f.write(data)
2113 f.write(data)
2114 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002115 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002116 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002117 self.assertEqual(f.read(), data * 2)
2118 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002119
Benjamin Petersona1b49012009-03-31 23:11:32 +00002120 def test_unreadable(self):
2121 class UnReadable(self.BytesIO):
2122 def readable(self):
2123 return False
2124 txt = self.TextIOWrapper(UnReadable())
2125 self.assertRaises(IOError, txt.read)
2126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002127 def test_read_one_by_one(self):
2128 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002129 reads = ""
2130 while True:
2131 c = txt.read(1)
2132 if not c:
2133 break
2134 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002135 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002136
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002137 def test_readlines(self):
2138 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2139 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2140 txt.seek(0)
2141 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2142 txt.seek(0)
2143 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2144
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002145 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002147 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002149 reads = ""
2150 while True:
2151 c = txt.read(128)
2152 if not c:
2153 break
2154 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002155 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002156
2157 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002158 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002159
2160 # read one char at a time
2161 reads = ""
2162 while True:
2163 c = txt.read(1)
2164 if not c:
2165 break
2166 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002168
2169 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002171 txt._CHUNK_SIZE = 4
2172
2173 reads = ""
2174 while True:
2175 c = txt.read(4)
2176 if not c:
2177 break
2178 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002179 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002180
2181 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002183 txt._CHUNK_SIZE = 4
2184
2185 reads = txt.read(4)
2186 reads += txt.read(4)
2187 reads += txt.readline()
2188 reads += txt.readline()
2189 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002191
2192 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002194 txt._CHUNK_SIZE = 4
2195
2196 reads = txt.read(4)
2197 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002199
2200 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002202 txt._CHUNK_SIZE = 4
2203
2204 reads = txt.read(4)
2205 pos = txt.tell()
2206 txt.seek(0)
2207 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002208 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002209
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002210 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 buffer = self.BytesIO(self.testdata)
2212 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002213
2214 self.assertEqual(buffer.seekable(), txt.seekable())
2215
Antoine Pitroue4501852009-05-14 18:55:55 +00002216 def test_append_bom(self):
2217 # The BOM is not written again when appending to a non-empty file
2218 filename = support.TESTFN
2219 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2220 with self.open(filename, 'w', encoding=charset) as f:
2221 f.write('aaa')
2222 pos = f.tell()
2223 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002225
2226 with self.open(filename, 'a', encoding=charset) as f:
2227 f.write('xxx')
2228 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002230
2231 def test_seek_bom(self):
2232 # Same test, but when seeking manually
2233 filename = support.TESTFN
2234 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2235 with self.open(filename, 'w', encoding=charset) as f:
2236 f.write('aaa')
2237 pos = f.tell()
2238 with self.open(filename, 'r+', encoding=charset) as f:
2239 f.seek(pos)
2240 f.write('zzz')
2241 f.seek(0)
2242 f.write('bbb')
2243 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002244 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002245
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002246 def test_errors_property(self):
2247 with self.open(support.TESTFN, "w") as f:
2248 self.assertEqual(f.errors, "strict")
2249 with self.open(support.TESTFN, "w", errors="replace") as f:
2250 self.assertEqual(f.errors, "replace")
2251
Brett Cannon31f59292011-02-21 19:29:56 +00002252 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002253 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002254 def test_threads_write(self):
2255 # Issue6750: concurrent writes could duplicate data
2256 event = threading.Event()
2257 with self.open(support.TESTFN, "w", buffering=1) as f:
2258 def run(n):
2259 text = "Thread%03d\n" % n
2260 event.wait()
2261 f.write(text)
2262 threads = [threading.Thread(target=lambda n=x: run(n))
2263 for x in range(20)]
2264 for t in threads:
2265 t.start()
2266 time.sleep(0.02)
2267 event.set()
2268 for t in threads:
2269 t.join()
2270 with self.open(support.TESTFN) as f:
2271 content = f.read()
2272 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002273 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002274
Antoine Pitrou6be88762010-05-03 16:48:20 +00002275 def test_flush_error_on_close(self):
2276 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2277 def bad_flush():
2278 raise IOError()
2279 txt.flush = bad_flush
2280 self.assertRaises(IOError, txt.close) # exception not swallowed
2281
2282 def test_multi_close(self):
2283 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2284 txt.close()
2285 txt.close()
2286 txt.close()
2287 self.assertRaises(ValueError, txt.flush)
2288
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002289 def test_unseekable(self):
2290 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2291 self.assertRaises(self.UnsupportedOperation, txt.tell)
2292 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2293
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002294 def test_readonly_attributes(self):
2295 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2296 buf = self.BytesIO(self.testdata)
2297 with self.assertRaises(AttributeError):
2298 txt.buffer = buf
2299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002300class CTextIOWrapperTest(TextIOWrapperTest):
2301
2302 def test_initialization(self):
2303 r = self.BytesIO(b"\xc3\xa9\n\n")
2304 b = self.BufferedReader(r, 1000)
2305 t = self.TextIOWrapper(b)
2306 self.assertRaises(TypeError, t.__init__, b, newline=42)
2307 self.assertRaises(ValueError, t.read)
2308 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2309 self.assertRaises(ValueError, t.read)
2310
2311 def test_garbage_collection(self):
2312 # C TextIOWrapper objects are collected, and collecting them flushes
2313 # all data to disk.
2314 # The Python version has __del__, so it ends in gc.garbage instead.
2315 rawio = io.FileIO(support.TESTFN, "wb")
2316 b = self.BufferedWriter(rawio)
2317 t = self.TextIOWrapper(b, encoding="ascii")
2318 t.write("456def")
2319 t.x = t
2320 wr = weakref.ref(t)
2321 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002322 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002323 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002324 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002325 self.assertEqual(f.read(), b"456def")
2326
2327class PyTextIOWrapperTest(TextIOWrapperTest):
2328 pass
2329
2330
2331class IncrementalNewlineDecoderTest(unittest.TestCase):
2332
2333 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002334 # UTF-8 specific tests for a newline decoder
2335 def _check_decode(b, s, **kwargs):
2336 # We exercise getstate() / setstate() as well as decode()
2337 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002338 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002339 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002340 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002341
Antoine Pitrou180a3362008-12-14 16:36:46 +00002342 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002343
Antoine Pitrou180a3362008-12-14 16:36:46 +00002344 _check_decode(b'\xe8', "")
2345 _check_decode(b'\xa2', "")
2346 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002347
Antoine Pitrou180a3362008-12-14 16:36:46 +00002348 _check_decode(b'\xe8', "")
2349 _check_decode(b'\xa2', "")
2350 _check_decode(b'\x88', "\u8888")
2351
2352 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002353 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2354
Antoine Pitrou180a3362008-12-14 16:36:46 +00002355 decoder.reset()
2356 _check_decode(b'\n', "\n")
2357 _check_decode(b'\r', "")
2358 _check_decode(b'', "\n", final=True)
2359 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002360
Antoine Pitrou180a3362008-12-14 16:36:46 +00002361 _check_decode(b'\r', "")
2362 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002363
Antoine Pitrou180a3362008-12-14 16:36:46 +00002364 _check_decode(b'\r\r\n', "\n\n")
2365 _check_decode(b'\r', "")
2366 _check_decode(b'\r', "\n")
2367 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002368
Antoine Pitrou180a3362008-12-14 16:36:46 +00002369 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2370 _check_decode(b'\xe8\xa2\x88', "\u8888")
2371 _check_decode(b'\n', "\n")
2372 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2373 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002374
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002376 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 if encoding is not None:
2378 encoder = codecs.getincrementalencoder(encoding)()
2379 def _decode_bytewise(s):
2380 # Decode one byte at a time
2381 for b in encoder.encode(s):
2382 result.append(decoder.decode(bytes([b])))
2383 else:
2384 encoder = None
2385 def _decode_bytewise(s):
2386 # Decode one char at a time
2387 for c in s:
2388 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002390 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002391 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002392 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002394 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002395 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002396 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002397 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002398 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002399 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002400 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002401 input = "abc"
2402 if encoder is not None:
2403 encoder.reset()
2404 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002405 self.assertEqual(decoder.decode(input), "abc")
2406 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002407
2408 def test_newline_decoder(self):
2409 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002410 # None meaning the IncrementalNewlineDecoder takes unicode input
2411 # rather than bytes input
2412 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002413 'utf-16', 'utf-16-le', 'utf-16-be',
2414 'utf-32', 'utf-32-le', 'utf-32-be',
2415 )
2416 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002417 decoder = enc and codecs.getincrementaldecoder(enc)()
2418 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2419 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002420 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002421 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2422 self.check_newline_decoding_utf8(decoder)
2423
Antoine Pitrou66913e22009-03-06 23:40:56 +00002424 def test_newline_bytes(self):
2425 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2426 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002427 self.assertEqual(dec.newlines, None)
2428 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2429 self.assertEqual(dec.newlines, None)
2430 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2431 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002432 dec = self.IncrementalNewlineDecoder(None, translate=False)
2433 _check(dec)
2434 dec = self.IncrementalNewlineDecoder(None, translate=True)
2435 _check(dec)
2436
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002437class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2438 pass
2439
2440class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2441 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002442
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002443
Guido van Rossum01a27522007-03-07 01:00:12 +00002444# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002445
Guido van Rossum5abbf752007-08-27 17:39:33 +00002446class MiscIOTest(unittest.TestCase):
2447
Barry Warsaw40e82462008-11-20 20:14:50 +00002448 def tearDown(self):
2449 support.unlink(support.TESTFN)
2450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002451 def test___all__(self):
2452 for name in self.io.__all__:
2453 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002454 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002455 if name == "open":
2456 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002457 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002458 self.assertTrue(issubclass(obj, Exception), name)
2459 elif not name.startswith("SEEK_"):
2460 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002461
Barry Warsaw40e82462008-11-20 20:14:50 +00002462 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002463 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002464 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002465 f.close()
2466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002467 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(f.name, support.TESTFN)
2469 self.assertEqual(f.buffer.name, support.TESTFN)
2470 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2471 self.assertEqual(f.mode, "U")
2472 self.assertEqual(f.buffer.mode, "rb")
2473 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002474 f.close()
2475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002477 self.assertEqual(f.mode, "w+")
2478 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2479 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002481 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002482 self.assertEqual(g.mode, "wb")
2483 self.assertEqual(g.raw.mode, "wb")
2484 self.assertEqual(g.name, f.fileno())
2485 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002486 f.close()
2487 g.close()
2488
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002489 def test_io_after_close(self):
2490 for kwargs in [
2491 {"mode": "w"},
2492 {"mode": "wb"},
2493 {"mode": "w", "buffering": 1},
2494 {"mode": "w", "buffering": 2},
2495 {"mode": "wb", "buffering": 0},
2496 {"mode": "r"},
2497 {"mode": "rb"},
2498 {"mode": "r", "buffering": 1},
2499 {"mode": "r", "buffering": 2},
2500 {"mode": "rb", "buffering": 0},
2501 {"mode": "w+"},
2502 {"mode": "w+b"},
2503 {"mode": "w+", "buffering": 1},
2504 {"mode": "w+", "buffering": 2},
2505 {"mode": "w+b", "buffering": 0},
2506 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002508 f.close()
2509 self.assertRaises(ValueError, f.flush)
2510 self.assertRaises(ValueError, f.fileno)
2511 self.assertRaises(ValueError, f.isatty)
2512 self.assertRaises(ValueError, f.__iter__)
2513 if hasattr(f, "peek"):
2514 self.assertRaises(ValueError, f.peek, 1)
2515 self.assertRaises(ValueError, f.read)
2516 if hasattr(f, "read1"):
2517 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002518 if hasattr(f, "readall"):
2519 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002520 if hasattr(f, "readinto"):
2521 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2522 self.assertRaises(ValueError, f.readline)
2523 self.assertRaises(ValueError, f.readlines)
2524 self.assertRaises(ValueError, f.seek, 0)
2525 self.assertRaises(ValueError, f.tell)
2526 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002527 self.assertRaises(ValueError, f.write,
2528 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002529 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002530 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 def test_blockingioerror(self):
2533 # Various BlockingIOError issues
2534 self.assertRaises(TypeError, self.BlockingIOError)
2535 self.assertRaises(TypeError, self.BlockingIOError, 1)
2536 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2537 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2538 b = self.BlockingIOError(1, "")
2539 self.assertEqual(b.characters_written, 0)
2540 class C(str):
2541 pass
2542 c = C("")
2543 b = self.BlockingIOError(1, c)
2544 c.b = b
2545 b.c = c
2546 wr = weakref.ref(c)
2547 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002548 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002549 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550
2551 def test_abcs(self):
2552 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002553 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2554 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2555 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2556 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557
2558 def _check_abc_inheritance(self, abcmodule):
2559 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002560 self.assertIsInstance(f, abcmodule.IOBase)
2561 self.assertIsInstance(f, abcmodule.RawIOBase)
2562 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2563 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002564 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002565 self.assertIsInstance(f, abcmodule.IOBase)
2566 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2567 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2568 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002570 self.assertIsInstance(f, abcmodule.IOBase)
2571 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2572 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2573 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574
2575 def test_abc_inheritance(self):
2576 # Test implementations inherit from their respective ABCs
2577 self._check_abc_inheritance(self)
2578
2579 def test_abc_inheritance_official(self):
2580 # Test implementations inherit from the official ABCs of the
2581 # baseline "io" module.
2582 self._check_abc_inheritance(io)
2583
Antoine Pitroue033e062010-10-29 10:38:18 +00002584 def _check_warn_on_dealloc(self, *args, **kwargs):
2585 f = open(*args, **kwargs)
2586 r = repr(f)
2587 with self.assertWarns(ResourceWarning) as cm:
2588 f = None
2589 support.gc_collect()
2590 self.assertIn(r, str(cm.warning.args[0]))
2591
2592 def test_warn_on_dealloc(self):
2593 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2594 self._check_warn_on_dealloc(support.TESTFN, "wb")
2595 self._check_warn_on_dealloc(support.TESTFN, "w")
2596
2597 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2598 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002599 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002600 for fd in fds:
2601 try:
2602 os.close(fd)
2603 except EnvironmentError as e:
2604 if e.errno != errno.EBADF:
2605 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002606 self.addCleanup(cleanup_fds)
2607 r, w = os.pipe()
2608 fds += r, w
2609 self._check_warn_on_dealloc(r, *args, **kwargs)
2610 # When using closefd=False, there's no warning
2611 r, w = os.pipe()
2612 fds += r, w
2613 with warnings.catch_warnings(record=True) as recorded:
2614 open(r, *args, closefd=False, **kwargs)
2615 support.gc_collect()
2616 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002617
2618 def test_warn_on_dealloc_fd(self):
2619 self._check_warn_on_dealloc_fd("rb", buffering=0)
2620 self._check_warn_on_dealloc_fd("rb")
2621 self._check_warn_on_dealloc_fd("r")
2622
2623
Antoine Pitrou243757e2010-11-05 21:15:39 +00002624 def test_pickling(self):
2625 # Pickling file objects is forbidden
2626 for kwargs in [
2627 {"mode": "w"},
2628 {"mode": "wb"},
2629 {"mode": "wb", "buffering": 0},
2630 {"mode": "r"},
2631 {"mode": "rb"},
2632 {"mode": "rb", "buffering": 0},
2633 {"mode": "w+"},
2634 {"mode": "w+b"},
2635 {"mode": "w+b", "buffering": 0},
2636 ]:
2637 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2638 with self.open(support.TESTFN, **kwargs) as f:
2639 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641class CMiscIOTest(MiscIOTest):
2642 io = io
2643
2644class PyMiscIOTest(MiscIOTest):
2645 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002646
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002647
2648@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2649class SignalsTest(unittest.TestCase):
2650
2651 def setUp(self):
2652 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2653
2654 def tearDown(self):
2655 signal.signal(signal.SIGALRM, self.oldalrm)
2656
2657 def alarm_interrupt(self, sig, frame):
2658 1/0
2659
2660 @unittest.skipUnless(threading, 'Threading required for this test.')
2661 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2662 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002663 invokes the signal handler, and bubbles up the exception raised
2664 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002665 read_results = []
2666 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002667 if hasattr(signal, 'pthread_sigmask'):
2668 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002669 s = os.read(r, 1)
2670 read_results.append(s)
2671 t = threading.Thread(target=_read)
2672 t.daemon = True
2673 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002674 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002675 try:
2676 wio = self.io.open(w, **fdopen_kwargs)
2677 t.start()
2678 signal.alarm(1)
2679 # Fill the pipe enough that the write will be blocking.
2680 # It will be interrupted by the timer armed above. Since the
2681 # other thread has read one byte, the low-level write will
2682 # return with a successful (partial) result rather than an EINTR.
2683 # The buffered IO layer must check for pending signal
2684 # handlers, which in this case will invoke alarm_interrupt().
2685 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002686 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002687 t.join()
2688 # We got one byte, get another one and check that it isn't a
2689 # repeat of the first one.
2690 read_results.append(os.read(r, 1))
2691 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2692 finally:
2693 os.close(w)
2694 os.close(r)
2695 # This is deliberate. If we didn't close the file descriptor
2696 # before closing wio, wio would try to flush its internal
2697 # buffer, and block again.
2698 try:
2699 wio.close()
2700 except IOError as e:
2701 if e.errno != errno.EBADF:
2702 raise
2703
2704 def test_interrupted_write_unbuffered(self):
2705 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2706
2707 def test_interrupted_write_buffered(self):
2708 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2709
2710 def test_interrupted_write_text(self):
2711 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2712
Brett Cannon31f59292011-02-21 19:29:56 +00002713 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002714 def check_reentrant_write(self, data, **fdopen_kwargs):
2715 def on_alarm(*args):
2716 # Will be called reentrantly from the same thread
2717 wio.write(data)
2718 1/0
2719 signal.signal(signal.SIGALRM, on_alarm)
2720 r, w = os.pipe()
2721 wio = self.io.open(w, **fdopen_kwargs)
2722 try:
2723 signal.alarm(1)
2724 # Either the reentrant call to wio.write() fails with RuntimeError,
2725 # or the signal handler raises ZeroDivisionError.
2726 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2727 while 1:
2728 for i in range(100):
2729 wio.write(data)
2730 wio.flush()
2731 # Make sure the buffer doesn't fill up and block further writes
2732 os.read(r, len(data) * 100)
2733 exc = cm.exception
2734 if isinstance(exc, RuntimeError):
2735 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2736 finally:
2737 wio.close()
2738 os.close(r)
2739
2740 def test_reentrant_write_buffered(self):
2741 self.check_reentrant_write(b"xy", mode="wb")
2742
2743 def test_reentrant_write_text(self):
2744 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2745
Antoine Pitrou707ce822011-02-25 21:24:11 +00002746 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2747 """Check that a buffered read, when it gets interrupted (either
2748 returning a partial result or EINTR), properly invokes the signal
2749 handler and retries if the latter returned successfully."""
2750 r, w = os.pipe()
2751 fdopen_kwargs["closefd"] = False
2752 def alarm_handler(sig, frame):
2753 os.write(w, b"bar")
2754 signal.signal(signal.SIGALRM, alarm_handler)
2755 try:
2756 rio = self.io.open(r, **fdopen_kwargs)
2757 os.write(w, b"foo")
2758 signal.alarm(1)
2759 # Expected behaviour:
2760 # - first raw read() returns partial b"foo"
2761 # - second raw read() returns EINTR
2762 # - third raw read() returns b"bar"
2763 self.assertEqual(decode(rio.read(6)), "foobar")
2764 finally:
2765 rio.close()
2766 os.close(w)
2767 os.close(r)
2768
2769 def test_interrupterd_read_retry_buffered(self):
2770 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2771 mode="rb")
2772
2773 def test_interrupterd_read_retry_text(self):
2774 self.check_interrupted_read_retry(lambda x: x,
2775 mode="r")
2776
2777 @unittest.skipUnless(threading, 'Threading required for this test.')
2778 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2779 """Check that a buffered write, when it gets interrupted (either
2780 returning a partial result or EINTR), properly invokes the signal
2781 handler and retries if the latter returned successfully."""
2782 select = support.import_module("select")
2783 # A quantity that exceeds the buffer size of an anonymous pipe's
2784 # write end.
2785 N = 1024 * 1024
2786 r, w = os.pipe()
2787 fdopen_kwargs["closefd"] = False
2788 # We need a separate thread to read from the pipe and allow the
2789 # write() to finish. This thread is started after the SIGALRM is
2790 # received (forcing a first EINTR in write()).
2791 read_results = []
2792 write_finished = False
2793 def _read():
2794 while not write_finished:
2795 while r in select.select([r], [], [], 1.0)[0]:
2796 s = os.read(r, 1024)
2797 read_results.append(s)
2798 t = threading.Thread(target=_read)
2799 t.daemon = True
2800 def alarm1(sig, frame):
2801 signal.signal(signal.SIGALRM, alarm2)
2802 signal.alarm(1)
2803 def alarm2(sig, frame):
2804 t.start()
2805 signal.signal(signal.SIGALRM, alarm1)
2806 try:
2807 wio = self.io.open(w, **fdopen_kwargs)
2808 signal.alarm(1)
2809 # Expected behaviour:
2810 # - first raw write() is partial (because of the limited pipe buffer
2811 # and the first alarm)
2812 # - second raw write() returns EINTR (because of the second alarm)
2813 # - subsequent write()s are successful (either partial or complete)
2814 self.assertEqual(N, wio.write(item * N))
2815 wio.flush()
2816 write_finished = True
2817 t.join()
2818 self.assertEqual(N, sum(len(x) for x in read_results))
2819 finally:
2820 write_finished = True
2821 os.close(w)
2822 os.close(r)
2823 # This is deliberate. If we didn't close the file descriptor
2824 # before closing wio, wio would try to flush its internal
2825 # buffer, and could block (in case of failure).
2826 try:
2827 wio.close()
2828 except IOError as e:
2829 if e.errno != errno.EBADF:
2830 raise
2831
2832 def test_interrupterd_write_retry_buffered(self):
2833 self.check_interrupted_write_retry(b"x", mode="wb")
2834
2835 def test_interrupterd_write_retry_text(self):
2836 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2837
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002838
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002839class CSignalsTest(SignalsTest):
2840 io = io
2841
2842class PySignalsTest(SignalsTest):
2843 io = pyio
2844
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002845 # Handling reentrancy issues would slow down _pyio even more, so the
2846 # tests are disabled.
2847 test_reentrant_write_buffered = None
2848 test_reentrant_write_text = None
2849
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002850
Guido van Rossum28524c72007-02-27 05:47:44 +00002851def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852 tests = (CIOTest, PyIOTest,
2853 CBufferedReaderTest, PyBufferedReaderTest,
2854 CBufferedWriterTest, PyBufferedWriterTest,
2855 CBufferedRWPairTest, PyBufferedRWPairTest,
2856 CBufferedRandomTest, PyBufferedRandomTest,
2857 StatefulIncrementalDecoderTest,
2858 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2859 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002860 CMiscIOTest, PyMiscIOTest,
2861 CSignalsTest, PySignalsTest,
2862 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002863
2864 # Put the namespaces of the IO module we are testing and some useful mock
2865 # classes in the __dict__ of each test.
2866 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002867 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2869 c_io_ns = {name : getattr(io, name) for name in all_members}
2870 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2871 globs = globals()
2872 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2873 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2874 # Avoid turning open into a bound method.
2875 py_io_ns["open"] = pyio.OpenWrapper
2876 for test in tests:
2877 if test.__name__.startswith("C"):
2878 for name, obj in c_io_ns.items():
2879 setattr(test, name, obj)
2880 elif test.__name__.startswith("Py"):
2881 for name, obj in py_io_ns.items():
2882 setattr(test, name, obj)
2883
2884 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002885
2886if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002887 test_main()