blob: 355a33e48e853afad1fcb37e5a3a6175858c46ec [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka78980432013-01-15 01:12:17 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010039from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000040
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000041import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000044try:
45 import threading
46except ImportError:
47 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010048try:
49 import fcntl
50except ImportError:
51 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053def _default_chunk_size():
54 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000055 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 return f._CHUNK_SIZE
57
58
Antoine Pitrou328ec742010-09-14 18:37:24 +000059class MockRawIOWithoutRead:
60 """A RawIO implementation without read(), so as to exercise the default
61 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000063 def __init__(self, read_stack=()):
64 self._read_stack = list(read_stack)
65 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000066 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000067 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068
Guido van Rossum01a27522007-03-07 01:00:12 +000069 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000070 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000071 return len(b)
72
73 def writable(self):
74 return True
75
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 def fileno(self):
77 return 42
78
79 def readable(self):
80 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000083 return True
84
Guido van Rossum01a27522007-03-07 01:00:12 +000085 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000087
88 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0 # same comment as above
90
91 def readinto(self, buf):
92 self._reads += 1
93 max_len = len(buf)
94 try:
95 data = self._read_stack[0]
96 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000097 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 return 0
99 if data is None:
100 del self._read_stack[0]
101 return None
102 n = len(data)
103 if len(data) <= max_len:
104 del self._read_stack[0]
105 buf[:n] = data
106 return n
107 else:
108 buf[:] = data[:max_len]
109 self._read_stack[0] = data[max_len:]
110 return max_len
111
112 def truncate(self, pos=None):
113 return pos
114
Antoine Pitrou328ec742010-09-14 18:37:24 +0000115class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
116 pass
117
118class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
119 pass
120
121
122class MockRawIO(MockRawIOWithoutRead):
123
124 def read(self, n=None):
125 self._reads += 1
126 try:
127 return self._read_stack.pop(0)
128 except:
129 self._extraneous_reads += 1
130 return b""
131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000132class CMockRawIO(MockRawIO, io.RawIOBase):
133 pass
134
135class PyMockRawIO(MockRawIO, pyio.RawIOBase):
136 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000137
Guido van Rossuma9e20242007-03-08 00:43:48 +0000138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000139class MisbehavedRawIO(MockRawIO):
140 def write(self, b):
141 return super().write(b) * 2
142
143 def read(self, n=None):
144 return super().read(n) * 2
145
146 def seek(self, pos, whence):
147 return -123
148
149 def tell(self):
150 return -456
151
152 def readinto(self, buf):
153 super().readinto(buf)
154 return len(buf) * 5
155
156class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
157 pass
158
159class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
160 pass
161
162
163class CloseFailureIO(MockRawIO):
164 closed = 0
165
166 def close(self):
167 if not self.closed:
168 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200169 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170
171class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
172 pass
173
174class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
175 pass
176
177
178class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000179
180 def __init__(self, data):
181 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183
184 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000186 self.read_history.append(None if res is None else len(res))
187 return res
188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189 def readinto(self, b):
190 res = super().readinto(b)
191 self.read_history.append(res)
192 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class CMockFileIO(MockFileIO, io.BytesIO):
195 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000197class PyMockFileIO(MockFileIO, pyio.BytesIO):
198 pass
199
200
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000201class MockUnseekableIO:
202 def seekable(self):
203 return False
204
205 def seek(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208 def tell(self, *args):
209 raise self.UnsupportedOperation("not seekable")
210
211class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
212 UnsupportedOperation = io.UnsupportedOperation
213
214class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
215 UnsupportedOperation = pyio.UnsupportedOperation
216
217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218class MockNonBlockWriterIO:
219
220 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000221 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000224 def pop_written(self):
225 s = b"".join(self._write_stack)
226 self._write_stack[:] = []
227 return s
228
229 def block_on(self, char):
230 """Block when a given char is encountered."""
231 self._blocker_char = char
232
233 def readable(self):
234 return True
235
236 def seekable(self):
237 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000238
Guido van Rossum01a27522007-03-07 01:00:12 +0000239 def writable(self):
240 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000242 def write(self, b):
243 b = bytes(b)
244 n = -1
245 if self._blocker_char:
246 try:
247 n = b.index(self._blocker_char)
248 except ValueError:
249 pass
250 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100251 if n > 0:
252 # write data up to the first blocker
253 self._write_stack.append(b[:n])
254 return n
255 else:
256 # cancel blocker and indicate would block
257 self._blocker_char = None
258 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259 self._write_stack.append(b)
260 return len(b)
261
262class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
263 BlockingIOError = io.BlockingIOError
264
265class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
266 BlockingIOError = pyio.BlockingIOError
267
Guido van Rossuma9e20242007-03-08 00:43:48 +0000268
Guido van Rossum28524c72007-02-27 05:47:44 +0000269class IOTest(unittest.TestCase):
270
Neal Norwitze7789b12008-03-24 06:18:09 +0000271 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000273
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000275 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000276
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000278 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000279 f.truncate(0)
280 self.assertEqual(f.tell(), 5)
281 f.seek(0)
282
283 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(0), 0)
285 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000288 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000289 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000291 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.seek(-1, 2), 13)
293 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294
Guido van Rossum87429772007-04-10 21:06:59 +0000295 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000296 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000297 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000298
Guido van Rossum9b76da62007-04-11 01:09:03 +0000299 def read_ops(self, f, buffered=False):
300 data = f.read(5)
301 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000302 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000303 self.assertEqual(f.readinto(data), 5)
304 self.assertEqual(data, b" worl")
305 self.assertEqual(f.readinto(data), 2)
306 self.assertEqual(len(data), 5)
307 self.assertEqual(data[:2], b"d\n")
308 self.assertEqual(f.seek(0), 0)
309 self.assertEqual(f.read(20), b"hello world\n")
310 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000311 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000312 self.assertEqual(f.seek(-6, 2), 6)
313 self.assertEqual(f.read(5), b"world")
314 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000315 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000316 self.assertEqual(f.seek(-6, 1), 5)
317 self.assertEqual(f.read(5), b" worl")
318 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000319 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000320 if buffered:
321 f.seek(0)
322 self.assertEqual(f.read(), b"hello world\n")
323 f.seek(6)
324 self.assertEqual(f.read(), b"world\n")
325 self.assertEqual(f.read(), b"")
326
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 LARGE = 2**31
328
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 def large_file_ops(self, f):
330 assert f.readable()
331 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.seek(self.LARGE), self.LARGE)
333 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000334 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 3)
336 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
339 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000340 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000341 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000342 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
343 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000344 self.assertEqual(f.read(2), b"x")
345
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 def test_invalid_operations(self):
347 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000349 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000350 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "wb", buffering=0) as fp:
354 self.assertRaises(exc, fp.read)
355 self.assertRaises(exc, fp.readline)
356 with self.open(support.TESTFN, "rb", buffering=0) as fp:
357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, b"blah")
361 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000362 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000363 self.assertRaises(exc, fp.write, "blah")
364 self.assertRaises(exc, fp.writelines, ["blah\n"])
365 # Non-zero seeking from current or end pos
366 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
367 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000368
Antoine Pitrou13348842012-01-29 18:36:34 +0100369 def test_open_handles_NUL_chars(self):
370 fn_with_NUL = 'foo\0bar'
371 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
372 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
373
Guido van Rossum28524c72007-02-27 05:47:44 +0000374 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000375 with self.open(support.TESTFN, "wb", buffering=0) as f:
376 self.assertEqual(f.readable(), False)
377 self.assertEqual(f.writable(), True)
378 self.assertEqual(f.seekable(), True)
379 self.write_ops(f)
380 with self.open(support.TESTFN, "rb", buffering=0) as f:
381 self.assertEqual(f.readable(), True)
382 self.assertEqual(f.writable(), False)
383 self.assertEqual(f.seekable(), True)
384 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000385
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000387 with self.open(support.TESTFN, "wb") as f:
388 self.assertEqual(f.readable(), False)
389 self.assertEqual(f.writable(), True)
390 self.assertEqual(f.seekable(), True)
391 self.write_ops(f)
392 with self.open(support.TESTFN, "rb") as f:
393 self.assertEqual(f.readable(), True)
394 self.assertEqual(f.writable(), False)
395 self.assertEqual(f.seekable(), True)
396 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000397
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000398 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000399 with self.open(support.TESTFN, "wb") as f:
400 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
401 with self.open(support.TESTFN, "rb") as f:
402 self.assertEqual(f.readline(), b"abc\n")
403 self.assertEqual(f.readline(10), b"def\n")
404 self.assertEqual(f.readline(2), b"xy")
405 self.assertEqual(f.readline(4), b"zzy\n")
406 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000407 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000408 self.assertRaises(TypeError, f.readline, 5.3)
409 with self.open(support.TESTFN, "r") as f:
410 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000411
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000414 self.write_ops(f)
415 data = f.getvalue()
416 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000418 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000419
Guido van Rossum53807da2007-04-10 19:01:47 +0000420 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000421 # On Windows and Mac OSX this test comsumes large resources; It takes
422 # a long time to build the >2GB file and takes >2GB of disk space
423 # therefore the resource must be enabled to run this test.
424 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600425 support.requires(
426 'largefile',
427 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000428 with self.open(support.TESTFN, "w+b", 0) as f:
429 self.large_file_ops(f)
430 with self.open(support.TESTFN, "w+b") as f:
431 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000432
433 def test_with_open(self):
434 for bufsize in (0, 1, 100):
435 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000437 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000438 self.assertEqual(f.closed, True)
439 f = None
440 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000441 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000442 1/0
443 except ZeroDivisionError:
444 self.assertEqual(f.closed, True)
445 else:
446 self.fail("1/0 didn't raise an exception")
447
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 # issue 5008
449 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000457 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def test_destructor(self):
460 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def __del__(self):
463 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 try:
465 f = super().__del__
466 except AttributeError:
467 pass
468 else:
469 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000470 def close(self):
471 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def flush(self):
474 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000476 with support.check_warnings(('', ResourceWarning)):
477 f = MyFileIO(support.TESTFN, "wb")
478 f.write(b"xxx")
479 del f
480 support.gc_collect()
481 self.assertEqual(record, [1, 2, 3])
482 with self.open(support.TESTFN, "rb") as f:
483 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484
485 def _check_base_destructor(self, base):
486 record = []
487 class MyIO(base):
488 def __init__(self):
489 # This exercises the availability of attributes on object
490 # destruction.
491 # (in the C version, close() is called by the tp_dealloc
492 # function, not by __del__)
493 self.on_del = 1
494 self.on_close = 2
495 self.on_flush = 3
496 def __del__(self):
497 record.append(self.on_del)
498 try:
499 f = super().__del__
500 except AttributeError:
501 pass
502 else:
503 f()
504 def close(self):
505 record.append(self.on_close)
506 super().close()
507 def flush(self):
508 record.append(self.on_flush)
509 super().flush()
510 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000511 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000512 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 self.assertEqual(record, [1, 2, 3])
514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 def test_IOBase_destructor(self):
516 self._check_base_destructor(self.IOBase)
517
518 def test_RawIOBase_destructor(self):
519 self._check_base_destructor(self.RawIOBase)
520
521 def test_BufferedIOBase_destructor(self):
522 self._check_base_destructor(self.BufferedIOBase)
523
524 def test_TextIOBase_destructor(self):
525 self._check_base_destructor(self.TextIOBase)
526
Guido van Rossum87429772007-04-10 21:06:59 +0000527 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000528 with self.open(support.TESTFN, "wb") as f:
529 f.write(b"xxx")
530 with self.open(support.TESTFN, "rb") as f:
531 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000532
Guido van Rossumd4103952007-04-12 05:44:49 +0000533 def test_array_writes(self):
534 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000535 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000536 with self.open(support.TESTFN, "wb", 0) as f:
537 self.assertEqual(f.write(a), n)
538 with self.open(support.TESTFN, "wb") as f:
539 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000540
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000541 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000542 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 def test_read_closed(self):
546 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000547 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 with self.open(support.TESTFN, "r") as f:
549 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 self.assertEqual(file.read(), "egg\n")
551 file.seek(0)
552 file.close()
553 self.assertRaises(ValueError, file.read)
554
555 def test_no_closefd_with_filename(self):
556 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000558
559 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000561 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(file.buffer.raw.closefd, False)
566
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 def test_garbage_collection(self):
568 # FileIO objects are collected, and collecting them flushes
569 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000570 with support.check_warnings(('', ResourceWarning)):
571 f = self.FileIO(support.TESTFN, "wb")
572 f.write(b"abcxxx")
573 f.f = f
574 wr = weakref.ref(f)
575 del f
576 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000577 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000578 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000580
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 def test_unbounded_file(self):
582 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
583 zero = "/dev/zero"
584 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000585 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000590 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
596
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def test_flush_error_on_close(self):
598 f = self.open(support.TESTFN, "wb", buffering=0)
599 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200600 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200602 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600603 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000604
605 def test_multi_close(self):
606 f = self.open(support.TESTFN, "wb", buffering=0)
607 f.close()
608 f.close()
609 f.close()
610 self.assertRaises(ValueError, f.flush)
611
Antoine Pitrou328ec742010-09-14 18:37:24 +0000612 def test_RawIOBase_read(self):
613 # Exercise the default RawIOBase.read() implementation (which calls
614 # readinto() internally).
615 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
616 self.assertEqual(rawio.read(2), b"ab")
617 self.assertEqual(rawio.read(2), b"c")
618 self.assertEqual(rawio.read(2), b"d")
619 self.assertEqual(rawio.read(2), None)
620 self.assertEqual(rawio.read(2), b"ef")
621 self.assertEqual(rawio.read(2), b"g")
622 self.assertEqual(rawio.read(2), None)
623 self.assertEqual(rawio.read(2), b"")
624
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400625 def test_types_have_dict(self):
626 test = (
627 self.IOBase(),
628 self.RawIOBase(),
629 self.TextIOBase(),
630 self.StringIO(),
631 self.BytesIO()
632 )
633 for obj in test:
634 self.assertTrue(hasattr(obj, "__dict__"))
635
Ross Lagerwall59142db2011-10-31 20:34:46 +0200636 def test_opener(self):
637 with self.open(support.TESTFN, "w") as f:
638 f.write("egg\n")
639 fd = os.open(support.TESTFN, os.O_RDONLY)
640 def opener(path, flags):
641 return fd
642 with self.open("non-existent", "r", opener=opener) as f:
643 self.assertEqual(f.read(), "egg\n")
644
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200645 def test_fileio_closefd(self):
646 # Issue #4841
647 with self.open(__file__, 'rb') as f1, \
648 self.open(__file__, 'rb') as f2:
649 fileio = self.FileIO(f1.fileno(), closefd=False)
650 # .__init__() must not close f1
651 fileio.__init__(f2.fileno(), closefd=False)
652 f1.readline()
653 # .close() must not close f2
654 fileio.close()
655 f2.readline()
656
657
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000658class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200659
660 def test_IOBase_finalize(self):
661 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
662 # class which inherits IOBase and an object of this class are caught
663 # in a reference cycle and close() is already in the method cache.
664 class MyIO(self.IOBase):
665 def close(self):
666 pass
667
668 # create an instance to populate the method cache
669 MyIO()
670 obj = MyIO()
671 obj.obj = obj
672 wr = weakref.ref(obj)
673 del MyIO
674 del obj
675 support.gc_collect()
676 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000678class PyIOTest(IOTest):
679 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000680
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class CommonBufferedTests:
683 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
684
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000685 def test_detach(self):
686 raw = self.MockRawIO()
687 buf = self.tp(raw)
688 self.assertIs(buf.detach(), raw)
689 self.assertRaises(ValueError, buf.detach)
690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691 def test_fileno(self):
692 rawio = self.MockRawIO()
693 bufio = self.tp(rawio)
694
Ezio Melottib3aedd42010-11-20 19:04:17 +0000695 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696
Zachary Ware9fe6d862013-12-08 00:20:35 -0600697 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 def test_no_fileno(self):
699 # XXX will we always have fileno() function? If so, kill
700 # this test. Else, write it.
701 pass
702
703 def test_invalid_args(self):
704 rawio = self.MockRawIO()
705 bufio = self.tp(rawio)
706 # Invalid whence
707 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200708 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709
710 def test_override_destructor(self):
711 tp = self.tp
712 record = []
713 class MyBufferedIO(tp):
714 def __del__(self):
715 record.append(1)
716 try:
717 f = super().__del__
718 except AttributeError:
719 pass
720 else:
721 f()
722 def close(self):
723 record.append(2)
724 super().close()
725 def flush(self):
726 record.append(3)
727 super().flush()
728 rawio = self.MockRawIO()
729 bufio = MyBufferedIO(rawio)
730 writable = bufio.writable()
731 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000732 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 if writable:
734 self.assertEqual(record, [1, 2, 3])
735 else:
736 self.assertEqual(record, [1, 2])
737
738 def test_context_manager(self):
739 # Test usability as a context manager
740 rawio = self.MockRawIO()
741 bufio = self.tp(rawio)
742 def _with():
743 with bufio:
744 pass
745 _with()
746 # bufio should now be closed, and using it a second time should raise
747 # a ValueError.
748 self.assertRaises(ValueError, _with)
749
750 def test_error_through_destructor(self):
751 # Test that the exception state is not modified by a destructor,
752 # even if close() fails.
753 rawio = self.CloseFailureIO()
754 def f():
755 self.tp(rawio).xyzzy
756 with support.captured_output("stderr") as s:
757 self.assertRaises(AttributeError, f)
758 s = s.getvalue().strip()
759 if s:
760 # The destructor *may* have printed an unraisable error, check it
761 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200762 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000763 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000764
Antoine Pitrou716c4442009-05-23 19:04:03 +0000765 def test_repr(self):
766 raw = self.MockRawIO()
767 b = self.tp(raw)
768 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
769 self.assertEqual(repr(b), "<%s>" % clsname)
770 raw.name = "dummy"
771 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
772 raw.name = b"dummy"
773 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
774
Antoine Pitrou6be88762010-05-03 16:48:20 +0000775 def test_flush_error_on_close(self):
776 raw = self.MockRawIO()
777 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200778 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000779 raw.flush = bad_flush
780 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200781 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600782 self.assertTrue(b.closed)
783
784 def test_close_error_on_close(self):
785 raw = self.MockRawIO()
786 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200787 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600788 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200789 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600790 raw.close = bad_close
791 b = self.tp(raw)
792 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200793 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600794 b.close()
795 self.assertEqual(err.exception.args, ('close',))
796 self.assertEqual(err.exception.__context__.args, ('flush',))
797 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000798
799 def test_multi_close(self):
800 raw = self.MockRawIO()
801 b = self.tp(raw)
802 b.close()
803 b.close()
804 b.close()
805 self.assertRaises(ValueError, b.flush)
806
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000807 def test_unseekable(self):
808 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
809 self.assertRaises(self.UnsupportedOperation, bufio.tell)
810 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
811
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000812 def test_readonly_attributes(self):
813 raw = self.MockRawIO()
814 buf = self.tp(raw)
815 x = self.MockRawIO()
816 with self.assertRaises(AttributeError):
817 buf.raw = x
818
Guido van Rossum78892e42007-04-06 17:31:18 +0000819
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200820class SizeofTest:
821
822 @support.cpython_only
823 def test_sizeof(self):
824 bufsize1 = 4096
825 bufsize2 = 8192
826 rawio = self.MockRawIO()
827 bufio = self.tp(rawio, buffer_size=bufsize1)
828 size = sys.getsizeof(bufio) - bufsize1
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize2)
831 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
832
Jesus Ceadc469452012-10-04 12:37:56 +0200833 @support.cpython_only
834 def test_buffer_freeing(self) :
835 bufsize = 4096
836 rawio = self.MockRawIO()
837 bufio = self.tp(rawio, buffer_size=bufsize)
838 size = sys.getsizeof(bufio) - bufsize
839 bufio.close()
840 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
843 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 def test_constructor(self):
846 rawio = self.MockRawIO([b"abc"])
847 bufio = self.tp(rawio)
848 bufio.__init__(rawio)
849 bufio.__init__(rawio, buffer_size=1024)
850 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000851 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
855 rawio = self.MockRawIO([b"abc"])
856 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000857 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000860 for arg in (None, 7):
861 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
862 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000863 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 # Invalid args
865 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 def test_read1(self):
868 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
869 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000870 self.assertEqual(b"a", bufio.read(1))
871 self.assertEqual(b"b", bufio.read1(1))
872 self.assertEqual(rawio._reads, 1)
873 self.assertEqual(b"c", bufio.read1(100))
874 self.assertEqual(rawio._reads, 1)
875 self.assertEqual(b"d", bufio.read1(100))
876 self.assertEqual(rawio._reads, 2)
877 self.assertEqual(b"efg", bufio.read1(100))
878 self.assertEqual(rawio._reads, 3)
879 self.assertEqual(b"", bufio.read1(100))
880 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881 # Invalid args
882 self.assertRaises(ValueError, bufio.read1, -1)
883
884 def test_readinto(self):
885 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
886 bufio = self.tp(rawio)
887 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(bufio.readinto(b), 2)
889 self.assertEqual(b, b"ab")
890 self.assertEqual(bufio.readinto(b), 2)
891 self.assertEqual(b, b"cd")
892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ef")
894 self.assertEqual(bufio.readinto(b), 1)
895 self.assertEqual(b, b"gf")
896 self.assertEqual(bufio.readinto(b), 0)
897 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200898 rawio = self.MockRawIO((b"abc", None))
899 bufio = self.tp(rawio)
900 self.assertEqual(bufio.readinto(b), 2)
901 self.assertEqual(b, b"ab")
902 self.assertEqual(bufio.readinto(b), 1)
903 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000905 def test_readlines(self):
906 def bufio():
907 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
908 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000909 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
910 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
911 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000914 data = b"abcdefghi"
915 dlen = len(data)
916
917 tests = [
918 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
919 [ 100, [ 3, 3, 3], [ dlen ] ],
920 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
921 ]
922
923 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 rawio = self.MockFileIO(data)
925 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000926 pos = 0
927 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000928 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000929 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000934 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
936 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"abcd", bufio.read(6))
938 self.assertEqual(b"e", bufio.read(1))
939 self.assertEqual(b"fg", bufio.read())
940 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200941 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000942 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000943
Victor Stinnera80987f2011-05-25 22:47:16 +0200944 rawio = self.MockRawIO((b"a", None, None))
945 self.assertEqual(b"a", rawio.readall())
946 self.assertIsNone(rawio.readall())
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 def test_read_past_eof(self):
949 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
950 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000951
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_read_all(self):
955 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
956 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000957
Ezio Melottib3aedd42010-11-20 19:04:17 +0000958 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000959
Victor Stinner45df8202010-04-28 22:31:17 +0000960 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000961 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000963 try:
964 # Write out many bytes with exactly the same number of 0's,
965 # 1's... 255's. This will help us check that concurrent reading
966 # doesn't duplicate or forget contents.
967 N = 1000
968 l = list(range(256)) * N
969 random.shuffle(l)
970 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000971 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000973 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 errors = []
976 results = []
977 def f():
978 try:
979 # Intra-buffer read then buffer-flushing read
980 for n in cycle([1, 19]):
981 s = bufio.read(n)
982 if not s:
983 break
984 # list.append() is atomic
985 results.append(s)
986 except Exception as e:
987 errors.append(e)
988 raise
989 threads = [threading.Thread(target=f) for x in range(20)]
990 for t in threads:
991 t.start()
992 time.sleep(0.02) # yield
993 for t in threads:
994 t.join()
995 self.assertFalse(errors,
996 "the following exceptions were caught: %r" % errors)
997 s = b''.join(results)
998 for i in range(256):
999 c = bytes(bytearray([i]))
1000 self.assertEqual(s.count(c), N)
1001 finally:
1002 support.unlink(support.TESTFN)
1003
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001004 def test_unseekable(self):
1005 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1006 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1007 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1008 bufio.read(1)
1009 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1010 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1011
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001012 def test_misbehaved_io(self):
1013 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1014 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001015 self.assertRaises(OSError, bufio.seek, 0)
1016 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001018 def test_no_extraneous_read(self):
1019 # Issue #9550; when the raw IO object has satisfied the read request,
1020 # we should not issue any additional reads, otherwise it may block
1021 # (e.g. socket).
1022 bufsize = 16
1023 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1024 rawio = self.MockRawIO([b"x" * n])
1025 bufio = self.tp(rawio, bufsize)
1026 self.assertEqual(bufio.read(n), b"x" * n)
1027 # Simple case: one raw read is enough to satisfy the request.
1028 self.assertEqual(rawio._extraneous_reads, 0,
1029 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1030 # A more complex case where two raw reads are needed to satisfy
1031 # the request.
1032 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1033 bufio = self.tp(rawio, bufsize)
1034 self.assertEqual(bufio.read(n), b"x" * n)
1035 self.assertEqual(rawio._extraneous_reads, 0,
1036 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1037
1038
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001039class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 tp = io.BufferedReader
1041
1042 def test_constructor(self):
1043 BufferedReaderTest.test_constructor(self)
1044 # The allocation can succeed on 32-bit builds, e.g. with more
1045 # than 2GB RAM and a 64-bit kernel.
1046 if sys.maxsize > 0x7FFFFFFF:
1047 rawio = self.MockRawIO()
1048 bufio = self.tp(rawio)
1049 self.assertRaises((OverflowError, MemoryError, ValueError),
1050 bufio.__init__, rawio, sys.maxsize)
1051
1052 def test_initialization(self):
1053 rawio = self.MockRawIO([b"abc"])
1054 bufio = self.tp(rawio)
1055 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1056 self.assertRaises(ValueError, bufio.read)
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1058 self.assertRaises(ValueError, bufio.read)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1060 self.assertRaises(ValueError, bufio.read)
1061
1062 def test_misbehaved_io_read(self):
1063 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1064 bufio = self.tp(rawio)
1065 # _pyio.BufferedReader seems to implement reading different, so that
1066 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001067 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068
1069 def test_garbage_collection(self):
1070 # C BufferedReader objects are collected.
1071 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001072 with support.check_warnings(('', ResourceWarning)):
1073 rawio = self.FileIO(support.TESTFN, "w+b")
1074 f = self.tp(rawio)
1075 f.f = f
1076 wr = weakref.ref(f)
1077 del f
1078 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001079 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080
R David Murray67bfe802013-02-23 21:51:05 -05001081 def test_args_error(self):
1082 # Issue #17275
1083 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1084 self.tp(io.BytesIO(), 1024, 1024, 1024)
1085
1086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087class PyBufferedReaderTest(BufferedReaderTest):
1088 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001089
Guido van Rossuma9e20242007-03-08 00:43:48 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1092 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 def test_constructor(self):
1095 rawio = self.MockRawIO()
1096 bufio = self.tp(rawio)
1097 bufio.__init__(rawio)
1098 bufio.__init__(rawio, buffer_size=1024)
1099 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001100 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 bufio.flush()
1102 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1103 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1104 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1105 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001106 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001108 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001110 def test_detach_flush(self):
1111 raw = self.MockRawIO()
1112 buf = self.tp(raw)
1113 buf.write(b"howdy!")
1114 self.assertFalse(raw._write_stack)
1115 buf.detach()
1116 self.assertEqual(raw._write_stack, [b"howdy!"])
1117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001119 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 writer = self.MockRawIO()
1121 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001122 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001123 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 def test_write_overflow(self):
1126 writer = self.MockRawIO()
1127 bufio = self.tp(writer, 8)
1128 contents = b"abcdefghijklmnop"
1129 for n in range(0, len(contents), 3):
1130 bufio.write(contents[n:n+3])
1131 flushed = b"".join(writer._write_stack)
1132 # At least (total - 8) bytes were implicitly flushed, perhaps more
1133 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001134 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001136 def check_writes(self, intermediate_func):
1137 # Lots of writes, test the flushed output is as expected.
1138 contents = bytes(range(256)) * 1000
1139 n = 0
1140 writer = self.MockRawIO()
1141 bufio = self.tp(writer, 13)
1142 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1143 def gen_sizes():
1144 for size in count(1):
1145 for i in range(15):
1146 yield size
1147 sizes = gen_sizes()
1148 while n < len(contents):
1149 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001150 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 intermediate_func(bufio)
1152 n += size
1153 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001154 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_writes(self):
1157 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 def test_writes_and_flushes(self):
1160 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 def test_writes_and_seeks(self):
1163 def _seekabs(bufio):
1164 pos = bufio.tell()
1165 bufio.seek(pos + 1, 0)
1166 bufio.seek(pos - 1, 0)
1167 bufio.seek(pos, 0)
1168 self.check_writes(_seekabs)
1169 def _seekrel(bufio):
1170 pos = bufio.seek(0, 1)
1171 bufio.seek(+1, 1)
1172 bufio.seek(-1, 1)
1173 bufio.seek(pos, 0)
1174 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 def test_writes_and_truncates(self):
1177 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 def test_write_non_blocking(self):
1180 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001181 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001182
Ezio Melottib3aedd42010-11-20 19:04:17 +00001183 self.assertEqual(bufio.write(b"abcd"), 4)
1184 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 # 1 byte will be written, the rest will be buffered
1186 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001187 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1190 raw.block_on(b"0")
1191 try:
1192 bufio.write(b"opqrwxyz0123456789")
1193 except self.BlockingIOError as e:
1194 written = e.characters_written
1195 else:
1196 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001197 self.assertEqual(written, 16)
1198 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001199 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001200
Ezio Melottib3aedd42010-11-20 19:04:17 +00001201 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 s = raw.pop_written()
1203 # Previously buffered bytes were flushed
1204 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 def test_write_and_rewind(self):
1207 raw = io.BytesIO()
1208 bufio = self.tp(raw, 4)
1209 self.assertEqual(bufio.write(b"abcdef"), 6)
1210 self.assertEqual(bufio.tell(), 6)
1211 bufio.seek(0, 0)
1212 self.assertEqual(bufio.write(b"XY"), 2)
1213 bufio.seek(6, 0)
1214 self.assertEqual(raw.getvalue(), b"XYcdef")
1215 self.assertEqual(bufio.write(b"123456"), 6)
1216 bufio.flush()
1217 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219 def test_flush(self):
1220 writer = self.MockRawIO()
1221 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001222 bufio.write(b"abc")
1223 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001224 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001225
Antoine Pitrou131a4892012-10-16 22:57:11 +02001226 def test_writelines(self):
1227 l = [b'ab', b'cd', b'ef']
1228 writer = self.MockRawIO()
1229 bufio = self.tp(writer, 8)
1230 bufio.writelines(l)
1231 bufio.flush()
1232 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1233
1234 def test_writelines_userlist(self):
1235 l = UserList([b'ab', b'cd', b'ef'])
1236 writer = self.MockRawIO()
1237 bufio = self.tp(writer, 8)
1238 bufio.writelines(l)
1239 bufio.flush()
1240 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1241
1242 def test_writelines_error(self):
1243 writer = self.MockRawIO()
1244 bufio = self.tp(writer, 8)
1245 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1246 self.assertRaises(TypeError, bufio.writelines, None)
1247 self.assertRaises(TypeError, bufio.writelines, 'abc')
1248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 def test_destructor(self):
1250 writer = self.MockRawIO()
1251 bufio = self.tp(writer, 8)
1252 bufio.write(b"abc")
1253 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001254 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001255 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001256
1257 def test_truncate(self):
1258 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001259 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001260 bufio = self.tp(raw, 8)
1261 bufio.write(b"abcdef")
1262 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001263 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001264 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 self.assertEqual(f.read(), b"abc")
1266
Victor Stinner45df8202010-04-28 22:31:17 +00001267 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001268 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001270 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 # Write out many bytes from many threads and test they were
1272 # all flushed.
1273 N = 1000
1274 contents = bytes(range(256)) * N
1275 sizes = cycle([1, 19])
1276 n = 0
1277 queue = deque()
1278 while n < len(contents):
1279 size = next(sizes)
1280 queue.append(contents[n:n+size])
1281 n += size
1282 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001283 # We use a real file object because it allows us to
1284 # exercise situations where the GIL is released before
1285 # writing the buffer to the raw streams. This is in addition
1286 # to concurrency issues due to switching threads in the middle
1287 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001288 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001290 errors = []
1291 def f():
1292 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 while True:
1294 try:
1295 s = queue.popleft()
1296 except IndexError:
1297 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001298 bufio.write(s)
1299 except Exception as e:
1300 errors.append(e)
1301 raise
1302 threads = [threading.Thread(target=f) for x in range(20)]
1303 for t in threads:
1304 t.start()
1305 time.sleep(0.02) # yield
1306 for t in threads:
1307 t.join()
1308 self.assertFalse(errors,
1309 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001311 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 s = f.read()
1313 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001314 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001315 finally:
1316 support.unlink(support.TESTFN)
1317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def test_misbehaved_io(self):
1319 rawio = self.MisbehavedRawIO()
1320 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001321 self.assertRaises(OSError, bufio.seek, 0)
1322 self.assertRaises(OSError, bufio.tell)
1323 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001324
Florent Xicluna109d5732012-07-07 17:03:22 +02001325 def test_max_buffer_size_removal(self):
1326 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001327 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001328
Benjamin Peterson68623612012-12-20 11:53:11 -06001329 def test_write_error_on_close(self):
1330 raw = self.MockRawIO()
1331 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001332 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001333 raw.write = bad_write
1334 b = self.tp(raw)
1335 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001336 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001337 self.assertTrue(b.closed)
1338
Benjamin Peterson59406a92009-03-26 17:10:29 +00001339
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001340class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001341 tp = io.BufferedWriter
1342
1343 def test_constructor(self):
1344 BufferedWriterTest.test_constructor(self)
1345 # The allocation can succeed on 32-bit builds, e.g. with more
1346 # than 2GB RAM and a 64-bit kernel.
1347 if sys.maxsize > 0x7FFFFFFF:
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises((OverflowError, MemoryError, ValueError),
1351 bufio.__init__, rawio, sys.maxsize)
1352
1353 def test_initialization(self):
1354 rawio = self.MockRawIO()
1355 bufio = self.tp(rawio)
1356 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1357 self.assertRaises(ValueError, bufio.write, b"def")
1358 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1359 self.assertRaises(ValueError, bufio.write, b"def")
1360 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1361 self.assertRaises(ValueError, bufio.write, b"def")
1362
1363 def test_garbage_collection(self):
1364 # C BufferedWriter objects are collected, and collecting them flushes
1365 # all data to disk.
1366 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001367 with support.check_warnings(('', ResourceWarning)):
1368 rawio = self.FileIO(support.TESTFN, "w+b")
1369 f = self.tp(rawio)
1370 f.write(b"123xxx")
1371 f.x = f
1372 wr = weakref.ref(f)
1373 del f
1374 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001375 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001376 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377 self.assertEqual(f.read(), b"123xxx")
1378
R David Murray67bfe802013-02-23 21:51:05 -05001379 def test_args_error(self):
1380 # Issue #17275
1381 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1382 self.tp(io.BytesIO(), 1024, 1024, 1024)
1383
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001384
1385class PyBufferedWriterTest(BufferedWriterTest):
1386 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001387
Guido van Rossum01a27522007-03-07 01:00:12 +00001388class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001389
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001390 def test_constructor(self):
1391 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001392 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001393
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001394 def test_detach(self):
1395 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1396 self.assertRaises(self.UnsupportedOperation, pair.detach)
1397
Florent Xicluna109d5732012-07-07 17:03:22 +02001398 def test_constructor_max_buffer_size_removal(self):
1399 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001400 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001401
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001402 def test_constructor_with_not_readable(self):
1403 class NotReadable(MockRawIO):
1404 def readable(self):
1405 return False
1406
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001407 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001408
1409 def test_constructor_with_not_writeable(self):
1410 class NotWriteable(MockRawIO):
1411 def writable(self):
1412 return False
1413
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001414 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001415
1416 def test_read(self):
1417 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1418
1419 self.assertEqual(pair.read(3), b"abc")
1420 self.assertEqual(pair.read(1), b"d")
1421 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001422 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1423 self.assertEqual(pair.read(None), b"abc")
1424
1425 def test_readlines(self):
1426 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1427 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1428 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1429 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001430
1431 def test_read1(self):
1432 # .read1() is delegated to the underlying reader object, so this test
1433 # can be shallow.
1434 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1435
1436 self.assertEqual(pair.read1(3), b"abc")
1437
1438 def test_readinto(self):
1439 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1440
1441 data = bytearray(5)
1442 self.assertEqual(pair.readinto(data), 5)
1443 self.assertEqual(data, b"abcde")
1444
1445 def test_write(self):
1446 w = self.MockRawIO()
1447 pair = self.tp(self.MockRawIO(), w)
1448
1449 pair.write(b"abc")
1450 pair.flush()
1451 pair.write(b"def")
1452 pair.flush()
1453 self.assertEqual(w._write_stack, [b"abc", b"def"])
1454
1455 def test_peek(self):
1456 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1457
1458 self.assertTrue(pair.peek(3).startswith(b"abc"))
1459 self.assertEqual(pair.read(3), b"abc")
1460
1461 def test_readable(self):
1462 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1463 self.assertTrue(pair.readable())
1464
1465 def test_writeable(self):
1466 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1467 self.assertTrue(pair.writable())
1468
1469 def test_seekable(self):
1470 # BufferedRWPairs are never seekable, even if their readers and writers
1471 # are.
1472 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1473 self.assertFalse(pair.seekable())
1474
1475 # .flush() is delegated to the underlying writer object and has been
1476 # tested in the test_write method.
1477
1478 def test_close_and_closed(self):
1479 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1480 self.assertFalse(pair.closed)
1481 pair.close()
1482 self.assertTrue(pair.closed)
1483
1484 def test_isatty(self):
1485 class SelectableIsAtty(MockRawIO):
1486 def __init__(self, isatty):
1487 MockRawIO.__init__(self)
1488 self._isatty = isatty
1489
1490 def isatty(self):
1491 return self._isatty
1492
1493 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1494 self.assertFalse(pair.isatty())
1495
1496 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1497 self.assertTrue(pair.isatty())
1498
1499 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1500 self.assertTrue(pair.isatty())
1501
1502 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1503 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001504
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505class CBufferedRWPairTest(BufferedRWPairTest):
1506 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001508class PyBufferedRWPairTest(BufferedRWPairTest):
1509 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001510
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511
1512class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1513 read_mode = "rb+"
1514 write_mode = "wb+"
1515
1516 def test_constructor(self):
1517 BufferedReaderTest.test_constructor(self)
1518 BufferedWriterTest.test_constructor(self)
1519
1520 def test_read_and_write(self):
1521 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001522 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001523
1524 self.assertEqual(b"as", rw.read(2))
1525 rw.write(b"ddd")
1526 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001527 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001528 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001529 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531 def test_seek_and_tell(self):
1532 raw = self.BytesIO(b"asdfghjkl")
1533 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001534
Ezio Melottib3aedd42010-11-20 19:04:17 +00001535 self.assertEqual(b"as", rw.read(2))
1536 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001537 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001539
Antoine Pitroue05565e2011-08-20 14:39:23 +02001540 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001541 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001542 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001543 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001544 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001545 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001546 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001547 self.assertEqual(7, rw.tell())
1548 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001549 rw.flush()
1550 self.assertEqual(b"asdf123fl", raw.getvalue())
1551
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001552 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 def check_flush_and_read(self, read_func):
1555 raw = self.BytesIO(b"abcdefghi")
1556 bufio = self.tp(raw)
1557
Ezio Melottib3aedd42010-11-20 19:04:17 +00001558 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001560 self.assertEqual(b"ef", read_func(bufio, 2))
1561 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001563 self.assertEqual(6, bufio.tell())
1564 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001565 raw.seek(0, 0)
1566 raw.write(b"XYZ")
1567 # flush() resets the read buffer
1568 bufio.flush()
1569 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001570 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
1572 def test_flush_and_read(self):
1573 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1574
1575 def test_flush_and_readinto(self):
1576 def _readinto(bufio, n=-1):
1577 b = bytearray(n if n >= 0 else 9999)
1578 n = bufio.readinto(b)
1579 return bytes(b[:n])
1580 self.check_flush_and_read(_readinto)
1581
1582 def test_flush_and_peek(self):
1583 def _peek(bufio, n=-1):
1584 # This relies on the fact that the buffer can contain the whole
1585 # raw stream, otherwise peek() can return less.
1586 b = bufio.peek(n)
1587 if n != -1:
1588 b = b[:n]
1589 bufio.seek(len(b), 1)
1590 return b
1591 self.check_flush_and_read(_peek)
1592
1593 def test_flush_and_write(self):
1594 raw = self.BytesIO(b"abcdefghi")
1595 bufio = self.tp(raw)
1596
1597 bufio.write(b"123")
1598 bufio.flush()
1599 bufio.write(b"45")
1600 bufio.flush()
1601 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001602 self.assertEqual(b"12345fghi", raw.getvalue())
1603 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001604
1605 def test_threads(self):
1606 BufferedReaderTest.test_threads(self)
1607 BufferedWriterTest.test_threads(self)
1608
1609 def test_writes_and_peek(self):
1610 def _peek(bufio):
1611 bufio.peek(1)
1612 self.check_writes(_peek)
1613 def _peek(bufio):
1614 pos = bufio.tell()
1615 bufio.seek(-1, 1)
1616 bufio.peek(1)
1617 bufio.seek(pos, 0)
1618 self.check_writes(_peek)
1619
1620 def test_writes_and_reads(self):
1621 def _read(bufio):
1622 bufio.seek(-1, 1)
1623 bufio.read(1)
1624 self.check_writes(_read)
1625
1626 def test_writes_and_read1s(self):
1627 def _read1(bufio):
1628 bufio.seek(-1, 1)
1629 bufio.read1(1)
1630 self.check_writes(_read1)
1631
1632 def test_writes_and_readintos(self):
1633 def _read(bufio):
1634 bufio.seek(-1, 1)
1635 bufio.readinto(bytearray(1))
1636 self.check_writes(_read)
1637
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001638 def test_write_after_readahead(self):
1639 # Issue #6629: writing after the buffer was filled by readahead should
1640 # first rewind the raw stream.
1641 for overwrite_size in [1, 5]:
1642 raw = self.BytesIO(b"A" * 10)
1643 bufio = self.tp(raw, 4)
1644 # Trigger readahead
1645 self.assertEqual(bufio.read(1), b"A")
1646 self.assertEqual(bufio.tell(), 1)
1647 # Overwriting should rewind the raw stream if it needs so
1648 bufio.write(b"B" * overwrite_size)
1649 self.assertEqual(bufio.tell(), overwrite_size + 1)
1650 # If the write size was smaller than the buffer size, flush() and
1651 # check that rewind happens.
1652 bufio.flush()
1653 self.assertEqual(bufio.tell(), overwrite_size + 1)
1654 s = raw.getvalue()
1655 self.assertEqual(s,
1656 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1657
Antoine Pitrou7c404892011-05-13 00:13:33 +02001658 def test_write_rewind_write(self):
1659 # Various combinations of reading / writing / seeking backwards / writing again
1660 def mutate(bufio, pos1, pos2):
1661 assert pos2 >= pos1
1662 # Fill the buffer
1663 bufio.seek(pos1)
1664 bufio.read(pos2 - pos1)
1665 bufio.write(b'\x02')
1666 # This writes earlier than the previous write, but still inside
1667 # the buffer.
1668 bufio.seek(pos1)
1669 bufio.write(b'\x01')
1670
1671 b = b"\x80\x81\x82\x83\x84"
1672 for i in range(0, len(b)):
1673 for j in range(i, len(b)):
1674 raw = self.BytesIO(b)
1675 bufio = self.tp(raw, 100)
1676 mutate(bufio, i, j)
1677 bufio.flush()
1678 expected = bytearray(b)
1679 expected[j] = 2
1680 expected[i] = 1
1681 self.assertEqual(raw.getvalue(), expected,
1682 "failed result for i=%d, j=%d" % (i, j))
1683
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001684 def test_truncate_after_read_or_write(self):
1685 raw = self.BytesIO(b"A" * 10)
1686 bufio = self.tp(raw, 100)
1687 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1688 self.assertEqual(bufio.truncate(), 2)
1689 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1690 self.assertEqual(bufio.truncate(), 4)
1691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 def test_misbehaved_io(self):
1693 BufferedReaderTest.test_misbehaved_io(self)
1694 BufferedWriterTest.test_misbehaved_io(self)
1695
Antoine Pitroue05565e2011-08-20 14:39:23 +02001696 def test_interleaved_read_write(self):
1697 # Test for issue #12213
1698 with self.BytesIO(b'abcdefgh') as raw:
1699 with self.tp(raw, 100) as f:
1700 f.write(b"1")
1701 self.assertEqual(f.read(1), b'b')
1702 f.write(b'2')
1703 self.assertEqual(f.read1(1), b'd')
1704 f.write(b'3')
1705 buf = bytearray(1)
1706 f.readinto(buf)
1707 self.assertEqual(buf, b'f')
1708 f.write(b'4')
1709 self.assertEqual(f.peek(1), b'h')
1710 f.flush()
1711 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1712
1713 with self.BytesIO(b'abc') as raw:
1714 with self.tp(raw, 100) as f:
1715 self.assertEqual(f.read(1), b'a')
1716 f.write(b"2")
1717 self.assertEqual(f.read(1), b'c')
1718 f.flush()
1719 self.assertEqual(raw.getvalue(), b'a2c')
1720
1721 def test_interleaved_readline_write(self):
1722 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1723 with self.tp(raw) as f:
1724 f.write(b'1')
1725 self.assertEqual(f.readline(), b'b\n')
1726 f.write(b'2')
1727 self.assertEqual(f.readline(), b'def\n')
1728 f.write(b'3')
1729 self.assertEqual(f.readline(), b'\n')
1730 f.flush()
1731 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1732
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001733 # You can't construct a BufferedRandom over a non-seekable stream.
1734 test_unseekable = None
1735
R David Murray67bfe802013-02-23 21:51:05 -05001736
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001737class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 tp = io.BufferedRandom
1739
1740 def test_constructor(self):
1741 BufferedRandomTest.test_constructor(self)
1742 # The allocation can succeed on 32-bit builds, e.g. with more
1743 # than 2GB RAM and a 64-bit kernel.
1744 if sys.maxsize > 0x7FFFFFFF:
1745 rawio = self.MockRawIO()
1746 bufio = self.tp(rawio)
1747 self.assertRaises((OverflowError, MemoryError, ValueError),
1748 bufio.__init__, rawio, sys.maxsize)
1749
1750 def test_garbage_collection(self):
1751 CBufferedReaderTest.test_garbage_collection(self)
1752 CBufferedWriterTest.test_garbage_collection(self)
1753
R David Murray67bfe802013-02-23 21:51:05 -05001754 def test_args_error(self):
1755 # Issue #17275
1756 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1757 self.tp(io.BytesIO(), 1024, 1024, 1024)
1758
1759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760class PyBufferedRandomTest(BufferedRandomTest):
1761 tp = pyio.BufferedRandom
1762
1763
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001764# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1765# properties:
1766# - A single output character can correspond to many bytes of input.
1767# - The number of input bytes to complete the character can be
1768# undetermined until the last input byte is received.
1769# - The number of input bytes can vary depending on previous input.
1770# - A single input byte can correspond to many characters of output.
1771# - The number of output characters can be undetermined until the
1772# last input byte is received.
1773# - The number of output characters can vary depending on previous input.
1774
1775class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1776 """
1777 For testing seek/tell behavior with a stateful, buffering decoder.
1778
1779 Input is a sequence of words. Words may be fixed-length (length set
1780 by input) or variable-length (period-terminated). In variable-length
1781 mode, extra periods are ignored. Possible words are:
1782 - 'i' followed by a number sets the input length, I (maximum 99).
1783 When I is set to 0, words are space-terminated.
1784 - 'o' followed by a number sets the output length, O (maximum 99).
1785 - Any other word is converted into a word followed by a period on
1786 the output. The output word consists of the input word truncated
1787 or padded out with hyphens to make its length equal to O. If O
1788 is 0, the word is output verbatim without truncating or padding.
1789 I and O are initially set to 1. When I changes, any buffered input is
1790 re-scanned according to the new I. EOF also terminates the last word.
1791 """
1792
1793 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001794 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001795 self.reset()
1796
1797 def __repr__(self):
1798 return '<SID %x>' % id(self)
1799
1800 def reset(self):
1801 self.i = 1
1802 self.o = 1
1803 self.buffer = bytearray()
1804
1805 def getstate(self):
1806 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1807 return bytes(self.buffer), i*100 + o
1808
1809 def setstate(self, state):
1810 buffer, io = state
1811 self.buffer = bytearray(buffer)
1812 i, o = divmod(io, 100)
1813 self.i, self.o = i ^ 1, o ^ 1
1814
1815 def decode(self, input, final=False):
1816 output = ''
1817 for b in input:
1818 if self.i == 0: # variable-length, terminated with period
1819 if b == ord('.'):
1820 if self.buffer:
1821 output += self.process_word()
1822 else:
1823 self.buffer.append(b)
1824 else: # fixed-length, terminate after self.i bytes
1825 self.buffer.append(b)
1826 if len(self.buffer) == self.i:
1827 output += self.process_word()
1828 if final and self.buffer: # EOF terminates the last word
1829 output += self.process_word()
1830 return output
1831
1832 def process_word(self):
1833 output = ''
1834 if self.buffer[0] == ord('i'):
1835 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1836 elif self.buffer[0] == ord('o'):
1837 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1838 else:
1839 output = self.buffer.decode('ascii')
1840 if len(output) < self.o:
1841 output += '-'*self.o # pad out with hyphens
1842 if self.o:
1843 output = output[:self.o] # truncate to output length
1844 output += '.'
1845 self.buffer = bytearray()
1846 return output
1847
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001848 codecEnabled = False
1849
1850 @classmethod
1851 def lookupTestDecoder(cls, name):
1852 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001853 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001854 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001855 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001856 incrementalencoder=None,
1857 streamreader=None, streamwriter=None,
1858 incrementaldecoder=cls)
1859
1860# Register the previous decoder for testing.
1861# Disabled by default, tests will enable it.
1862codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1863
1864
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001865class StatefulIncrementalDecoderTest(unittest.TestCase):
1866 """
1867 Make sure the StatefulIncrementalDecoder actually works.
1868 """
1869
1870 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001871 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001872 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001873 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001874 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001875 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001876 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001877 # I=0, O=6 (variable-length input, fixed-length output)
1878 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1879 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001880 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001881 # I=6, O=3 (fixed-length input > fixed-length output)
1882 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1883 # I=0, then 3; O=29, then 15 (with longer output)
1884 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1885 'a----------------------------.' +
1886 'b----------------------------.' +
1887 'cde--------------------------.' +
1888 'abcdefghijabcde.' +
1889 'a.b------------.' +
1890 '.c.------------.' +
1891 'd.e------------.' +
1892 'k--------------.' +
1893 'l--------------.' +
1894 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001895 ]
1896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001897 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 # Try a few one-shot test cases.
1899 for input, eof, output in self.test_cases:
1900 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001901 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902
1903 # Also test an unfinished decode, followed by forcing EOF.
1904 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001905 self.assertEqual(d.decode(b'oiabcd'), '')
1906 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001907
1908class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001909
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001910 def setUp(self):
1911 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1912 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001913 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001914
Guido van Rossumd0712812007-04-11 16:32:43 +00001915 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001916 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918 def test_constructor(self):
1919 r = self.BytesIO(b"\xc3\xa9\n\n")
1920 b = self.BufferedReader(r, 1000)
1921 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001922 t.__init__(b, encoding="latin-1", newline="\r\n")
1923 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001924 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001925 t.__init__(b, encoding="utf-8", line_buffering=True)
1926 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001927 self.assertEqual(t.line_buffering, True)
1928 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 self.assertRaises(TypeError, t.__init__, b, newline=42)
1930 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1931
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001932 def test_detach(self):
1933 r = self.BytesIO()
1934 b = self.BufferedWriter(r)
1935 t = self.TextIOWrapper(b)
1936 self.assertIs(t.detach(), b)
1937
1938 t = self.TextIOWrapper(b, encoding="ascii")
1939 t.write("howdy")
1940 self.assertFalse(r.getvalue())
1941 t.detach()
1942 self.assertEqual(r.getvalue(), b"howdy")
1943 self.assertRaises(ValueError, t.detach)
1944
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001945 def test_repr(self):
1946 raw = self.BytesIO("hello".encode("utf-8"))
1947 b = self.BufferedReader(raw)
1948 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001949 modname = self.TextIOWrapper.__module__
1950 self.assertEqual(repr(t),
1951 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1952 raw.name = "dummy"
1953 self.assertEqual(repr(t),
1954 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001955 t.mode = "r"
1956 self.assertEqual(repr(t),
1957 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001958 raw.name = b"dummy"
1959 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001960 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 def test_line_buffering(self):
1963 r = self.BytesIO()
1964 b = self.BufferedWriter(r, 1000)
1965 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001966 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001967 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001968 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001969 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001970 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001971 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001972
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001973 def test_default_encoding(self):
1974 old_environ = dict(os.environ)
1975 try:
1976 # try to get a user preferred encoding different than the current
1977 # locale encoding to check that TextIOWrapper() uses the current
1978 # locale encoding and not the user preferred encoding
1979 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1980 if key in os.environ:
1981 del os.environ[key]
1982
1983 current_locale_encoding = locale.getpreferredencoding(False)
1984 b = self.BytesIO()
1985 t = self.TextIOWrapper(b)
1986 self.assertEqual(t.encoding, current_locale_encoding)
1987 finally:
1988 os.environ.clear()
1989 os.environ.update(old_environ)
1990
Serhiy Storchaka78980432013-01-15 01:12:17 +02001991 # Issue 15989
1992 def test_device_encoding(self):
1993 b = self.BytesIO()
1994 b.fileno = lambda: _testcapi.INT_MAX + 1
1995 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1996 b.fileno = lambda: _testcapi.UINT_MAX + 1
1997 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 def test_encoding(self):
2000 # Check the encoding attribute is always set, and valid
2001 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002002 t = self.TextIOWrapper(b, encoding="utf-8")
2003 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002005 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 codecs.lookup(t.encoding)
2007
2008 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002009 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002010 b = self.BytesIO(b"abc\n\xff\n")
2011 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002012 self.assertRaises(UnicodeError, t.read)
2013 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002014 b = self.BytesIO(b"abc\n\xff\n")
2015 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002016 self.assertRaises(UnicodeError, t.read)
2017 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 b = self.BytesIO(b"abc\n\xff\n")
2019 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002021 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 b = self.BytesIO(b"abc\n\xff\n")
2023 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002024 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002027 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002028 b = self.BytesIO()
2029 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002030 self.assertRaises(UnicodeError, t.write, "\xff")
2031 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 b = self.BytesIO()
2033 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002034 self.assertRaises(UnicodeError, t.write, "\xff")
2035 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 b = self.BytesIO()
2037 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002038 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002039 t.write("abc\xffdef\n")
2040 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002041 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002042 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 b = self.BytesIO()
2044 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002045 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002046 t.write("abc\xffdef\n")
2047 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002048 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002051 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2052
2053 tests = [
2054 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002055 [ '', input_lines ],
2056 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2057 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2058 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002059 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002060 encodings = (
2061 'utf-8', 'latin-1',
2062 'utf-16', 'utf-16-le', 'utf-16-be',
2063 'utf-32', 'utf-32-le', 'utf-32-be',
2064 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002065
Guido van Rossum8358db22007-08-18 21:39:55 +00002066 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002067 # character in TextIOWrapper._pending_line.
2068 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002069 # XXX: str.encode() should return bytes
2070 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002071 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002072 for bufsize in range(1, 10):
2073 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2075 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002076 encoding=encoding)
2077 if do_reads:
2078 got_lines = []
2079 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002080 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002081 if c2 == '':
2082 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002083 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002084 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002085 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002086 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002087
2088 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(got_line, exp_line)
2090 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002092 def test_newlines_input(self):
2093 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002094 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2095 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002096 (None, normalized.decode("ascii").splitlines(keepends=True)),
2097 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2099 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2100 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002101 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 buf = self.BytesIO(testdata)
2103 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002104 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002105 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002107
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 def test_newlines_output(self):
2109 testdict = {
2110 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2111 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2112 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2113 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2114 }
2115 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2116 for newline, expected in tests:
2117 buf = self.BytesIO()
2118 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2119 txt.write("AAA\nB")
2120 txt.write("BB\nCCC\n")
2121 txt.write("X\rY\r\nZ")
2122 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(buf.closed, False)
2124 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002125
2126 def test_destructor(self):
2127 l = []
2128 base = self.BytesIO
2129 class MyBytesIO(base):
2130 def close(self):
2131 l.append(self.getvalue())
2132 base.close(self)
2133 b = MyBytesIO()
2134 t = self.TextIOWrapper(b, encoding="ascii")
2135 t.write("abc")
2136 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002137 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002138 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139
2140 def test_override_destructor(self):
2141 record = []
2142 class MyTextIO(self.TextIOWrapper):
2143 def __del__(self):
2144 record.append(1)
2145 try:
2146 f = super().__del__
2147 except AttributeError:
2148 pass
2149 else:
2150 f()
2151 def close(self):
2152 record.append(2)
2153 super().close()
2154 def flush(self):
2155 record.append(3)
2156 super().flush()
2157 b = self.BytesIO()
2158 t = MyTextIO(b, encoding="ascii")
2159 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002160 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161 self.assertEqual(record, [1, 2, 3])
2162
2163 def test_error_through_destructor(self):
2164 # Test that the exception state is not modified by a destructor,
2165 # even if close() fails.
2166 rawio = self.CloseFailureIO()
2167 def f():
2168 self.TextIOWrapper(rawio).xyzzy
2169 with support.captured_output("stderr") as s:
2170 self.assertRaises(AttributeError, f)
2171 s = s.getvalue().strip()
2172 if s:
2173 # The destructor *may* have printed an unraisable error, check it
2174 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002175 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002176 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002177
Guido van Rossum9b76da62007-04-11 01:09:03 +00002178 # Systematic tests of the text I/O API
2179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002181 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002182 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002183 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002184 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002186 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002188 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002189 self.assertEqual(f.tell(), 0)
2190 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002191 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(f.seek(0), 0)
2193 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002194 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(f.read(2), "ab")
2196 self.assertEqual(f.read(1), "c")
2197 self.assertEqual(f.read(1), "")
2198 self.assertEqual(f.read(), "")
2199 self.assertEqual(f.tell(), cookie)
2200 self.assertEqual(f.seek(0), 0)
2201 self.assertEqual(f.seek(0, 2), cookie)
2202 self.assertEqual(f.write("def"), 3)
2203 self.assertEqual(f.seek(cookie), cookie)
2204 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002205 if enc.startswith("utf"):
2206 self.multi_line_test(f, enc)
2207 f.close()
2208
2209 def multi_line_test(self, f, enc):
2210 f.seek(0)
2211 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002212 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002213 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002214 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002215 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002216 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002217 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002218 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002219 wlines.append((f.tell(), line))
2220 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002221 f.seek(0)
2222 rlines = []
2223 while True:
2224 pos = f.tell()
2225 line = f.readline()
2226 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002227 break
2228 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002230
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002232 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002233 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002234 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002235 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002236 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002237 p2 = f.tell()
2238 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(f.tell(), p0)
2240 self.assertEqual(f.readline(), "\xff\n")
2241 self.assertEqual(f.tell(), p1)
2242 self.assertEqual(f.readline(), "\xff\n")
2243 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002244 f.seek(0)
2245 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002246 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002247 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002248 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002249 f.close()
2250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 def test_seeking(self):
2252 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002253 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002254 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002255 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002256 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002257 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002258 suffix = bytes(u_suffix.encode("utf-8"))
2259 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002260 with self.open(support.TESTFN, "wb") as f:
2261 f.write(line*2)
2262 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2263 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002264 self.assertEqual(s, str(prefix, "ascii"))
2265 self.assertEqual(f.tell(), prefix_size)
2266 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002269 # Regression test for a specific bug
2270 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002271 with self.open(support.TESTFN, "wb") as f:
2272 f.write(data)
2273 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2274 f._CHUNK_SIZE # Just test that it exists
2275 f._CHUNK_SIZE = 2
2276 f.readline()
2277 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 def test_seek_and_tell(self):
2280 #Test seek/tell using the StatefulIncrementalDecoder.
2281 # Make test faster by doing smaller seeks
2282 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002283
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002284 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002285 """Tell/seek to various points within a data stream and ensure
2286 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002288 f.write(data)
2289 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 f = self.open(support.TESTFN, encoding='test_decoder')
2291 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002292 decoded = f.read()
2293 f.close()
2294
Neal Norwitze2b07052008-03-18 19:52:05 +00002295 for i in range(min_pos, len(decoded) + 1): # seek positions
2296 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002298 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002299 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002300 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002301 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002302 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002303 f.close()
2304
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002305 # Enable the test decoder.
2306 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002307
2308 # Run the tests.
2309 try:
2310 # Try each test case.
2311 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002312 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002313
2314 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002315 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2316 offset = CHUNK_SIZE - len(input)//2
2317 prefix = b'.'*offset
2318 # Don't bother seeking into the prefix (takes too long).
2319 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002320 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002321
2322 # Ensure our test decoder won't interfere with subsequent tests.
2323 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002324 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002325
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002326 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002327 data = "1234567890"
2328 tests = ("utf-16",
2329 "utf-16-le",
2330 "utf-16-be",
2331 "utf-32",
2332 "utf-32-le",
2333 "utf-32-be")
2334 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 buf = self.BytesIO()
2336 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002337 # Check if the BOM is written only once (see issue1753).
2338 f.write(data)
2339 f.write(data)
2340 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002341 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002342 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(f.read(), data * 2)
2344 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002345
Benjamin Petersona1b49012009-03-31 23:11:32 +00002346 def test_unreadable(self):
2347 class UnReadable(self.BytesIO):
2348 def readable(self):
2349 return False
2350 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002351 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002353 def test_read_one_by_one(self):
2354 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002355 reads = ""
2356 while True:
2357 c = txt.read(1)
2358 if not c:
2359 break
2360 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002361 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002362
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002363 def test_readlines(self):
2364 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2365 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2366 txt.seek(0)
2367 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2368 txt.seek(0)
2369 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2370
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002371 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002373 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002375 reads = ""
2376 while True:
2377 c = txt.read(128)
2378 if not c:
2379 break
2380 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002381 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002382
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002383 def test_writelines(self):
2384 l = ['ab', 'cd', 'ef']
2385 buf = self.BytesIO()
2386 txt = self.TextIOWrapper(buf)
2387 txt.writelines(l)
2388 txt.flush()
2389 self.assertEqual(buf.getvalue(), b'abcdef')
2390
2391 def test_writelines_userlist(self):
2392 l = UserList(['ab', 'cd', 'ef'])
2393 buf = self.BytesIO()
2394 txt = self.TextIOWrapper(buf)
2395 txt.writelines(l)
2396 txt.flush()
2397 self.assertEqual(buf.getvalue(), b'abcdef')
2398
2399 def test_writelines_error(self):
2400 txt = self.TextIOWrapper(self.BytesIO())
2401 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2402 self.assertRaises(TypeError, txt.writelines, None)
2403 self.assertRaises(TypeError, txt.writelines, b'abc')
2404
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002405 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002407
2408 # read one char at a time
2409 reads = ""
2410 while True:
2411 c = txt.read(1)
2412 if not c:
2413 break
2414 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002415 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002416
2417 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002418 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002419 txt._CHUNK_SIZE = 4
2420
2421 reads = ""
2422 while True:
2423 c = txt.read(4)
2424 if not c:
2425 break
2426 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002427 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002428
2429 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002430 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002431 txt._CHUNK_SIZE = 4
2432
2433 reads = txt.read(4)
2434 reads += txt.read(4)
2435 reads += txt.readline()
2436 reads += txt.readline()
2437 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002438 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002439
2440 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002441 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002442 txt._CHUNK_SIZE = 4
2443
2444 reads = txt.read(4)
2445 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002446 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002447
2448 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002449 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002450 txt._CHUNK_SIZE = 4
2451
2452 reads = txt.read(4)
2453 pos = txt.tell()
2454 txt.seek(0)
2455 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002456 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002457
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002458 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002459 buffer = self.BytesIO(self.testdata)
2460 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002461
2462 self.assertEqual(buffer.seekable(), txt.seekable())
2463
Antoine Pitroue4501852009-05-14 18:55:55 +00002464 def test_append_bom(self):
2465 # The BOM is not written again when appending to a non-empty file
2466 filename = support.TESTFN
2467 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2468 with self.open(filename, 'w', encoding=charset) as f:
2469 f.write('aaa')
2470 pos = f.tell()
2471 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002472 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002473
2474 with self.open(filename, 'a', encoding=charset) as f:
2475 f.write('xxx')
2476 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002477 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002478
2479 def test_seek_bom(self):
2480 # Same test, but when seeking manually
2481 filename = support.TESTFN
2482 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2483 with self.open(filename, 'w', encoding=charset) as f:
2484 f.write('aaa')
2485 pos = f.tell()
2486 with self.open(filename, 'r+', encoding=charset) as f:
2487 f.seek(pos)
2488 f.write('zzz')
2489 f.seek(0)
2490 f.write('bbb')
2491 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002492 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002493
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002494 def test_errors_property(self):
2495 with self.open(support.TESTFN, "w") as f:
2496 self.assertEqual(f.errors, "strict")
2497 with self.open(support.TESTFN, "w", errors="replace") as f:
2498 self.assertEqual(f.errors, "replace")
2499
Brett Cannon31f59292011-02-21 19:29:56 +00002500 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002501 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002502 def test_threads_write(self):
2503 # Issue6750: concurrent writes could duplicate data
2504 event = threading.Event()
2505 with self.open(support.TESTFN, "w", buffering=1) as f:
2506 def run(n):
2507 text = "Thread%03d\n" % n
2508 event.wait()
2509 f.write(text)
2510 threads = [threading.Thread(target=lambda n=x: run(n))
2511 for x in range(20)]
2512 for t in threads:
2513 t.start()
2514 time.sleep(0.02)
2515 event.set()
2516 for t in threads:
2517 t.join()
2518 with self.open(support.TESTFN) as f:
2519 content = f.read()
2520 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002521 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002522
Antoine Pitrou6be88762010-05-03 16:48:20 +00002523 def test_flush_error_on_close(self):
2524 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2525 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002526 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002527 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002528 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002529 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002530
2531 def test_multi_close(self):
2532 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2533 txt.close()
2534 txt.close()
2535 txt.close()
2536 self.assertRaises(ValueError, txt.flush)
2537
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002538 def test_unseekable(self):
2539 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2540 self.assertRaises(self.UnsupportedOperation, txt.tell)
2541 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2542
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002543 def test_readonly_attributes(self):
2544 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2545 buf = self.BytesIO(self.testdata)
2546 with self.assertRaises(AttributeError):
2547 txt.buffer = buf
2548
Antoine Pitroue96ec682011-07-23 21:46:35 +02002549 def test_rawio(self):
2550 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2551 # that subprocess.Popen() can have the required unbuffered
2552 # semantics with universal_newlines=True.
2553 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2554 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2555 # Reads
2556 self.assertEqual(txt.read(4), 'abcd')
2557 self.assertEqual(txt.readline(), 'efghi\n')
2558 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2559
2560 def test_rawio_write_through(self):
2561 # Issue #12591: with write_through=True, writes don't need a flush
2562 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2563 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2564 write_through=True)
2565 txt.write('1')
2566 txt.write('23\n4')
2567 txt.write('5')
2568 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2569
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002570 def test_read_nonbytes(self):
2571 # Issue #17106
2572 # Crash when underlying read() returns non-bytes
2573 t = self.TextIOWrapper(self.StringIO('a'))
2574 self.assertRaises(TypeError, t.read, 1)
2575 t = self.TextIOWrapper(self.StringIO('a'))
2576 self.assertRaises(TypeError, t.readline)
2577 t = self.TextIOWrapper(self.StringIO('a'))
2578 self.assertRaises(TypeError, t.read)
2579
2580 def test_illegal_decoder(self):
2581 # Issue #17106
2582 # Crash when decoder returns non-string
2583 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2584 encoding='quopri_codec')
2585 self.assertRaises(TypeError, t.read, 1)
2586 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2587 encoding='quopri_codec')
2588 self.assertRaises(TypeError, t.readline)
2589 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2590 encoding='quopri_codec')
2591 self.assertRaises(TypeError, t.read)
2592
Antoine Pitrou712cb732013-12-21 15:51:54 +01002593 def _check_create_at_shutdown(self, **kwargs):
2594 # Issue #20037: creating a TextIOWrapper at shutdown
2595 # shouldn't crash the interpreter.
2596 iomod = self.io.__name__
2597 code = """if 1:
2598 import codecs
2599 import {iomod} as io
2600
2601 # Avoid looking up codecs at shutdown
2602 codecs.lookup('utf-8')
2603
2604 class C:
2605 def __init__(self):
2606 self.buf = io.BytesIO()
2607 def __del__(self):
2608 io.TextIOWrapper(self.buf, **{kwargs})
2609 print("ok")
2610 c = C()
2611 """.format(iomod=iomod, kwargs=kwargs)
2612 return assert_python_ok("-c", code)
2613
2614 def test_create_at_shutdown_without_encoding(self):
2615 rc, out, err = self._check_create_at_shutdown()
2616 if err:
2617 # Can error out with a RuntimeError if the module state
2618 # isn't found.
2619 self.assertIn("RuntimeError: could not find io module state",
2620 err.decode())
2621 else:
2622 self.assertEqual("ok", out.decode().strip())
2623
2624 def test_create_at_shutdown_with_encoding(self):
2625 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2626 errors='strict')
2627 self.assertFalse(err)
2628 self.assertEqual("ok", out.decode().strip())
2629
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002631class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002632 io = io
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002633
2634 def test_initialization(self):
2635 r = self.BytesIO(b"\xc3\xa9\n\n")
2636 b = self.BufferedReader(r, 1000)
2637 t = self.TextIOWrapper(b)
2638 self.assertRaises(TypeError, t.__init__, b, newline=42)
2639 self.assertRaises(ValueError, t.read)
2640 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2641 self.assertRaises(ValueError, t.read)
2642
2643 def test_garbage_collection(self):
2644 # C TextIOWrapper objects are collected, and collecting them flushes
2645 # all data to disk.
2646 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002647 with support.check_warnings(('', ResourceWarning)):
2648 rawio = io.FileIO(support.TESTFN, "wb")
2649 b = self.BufferedWriter(rawio)
2650 t = self.TextIOWrapper(b, encoding="ascii")
2651 t.write("456def")
2652 t.x = t
2653 wr = weakref.ref(t)
2654 del t
2655 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002656 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002657 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002658 self.assertEqual(f.read(), b"456def")
2659
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002660 def test_rwpair_cleared_before_textio(self):
2661 # Issue 13070: TextIOWrapper's finalization would crash when called
2662 # after the reference to the underlying BufferedRWPair's writer got
2663 # cleared by the GC.
2664 for i in range(1000):
2665 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2666 t1 = self.TextIOWrapper(b1, encoding="ascii")
2667 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2668 t2 = self.TextIOWrapper(b2, encoding="ascii")
2669 # circular references
2670 t1.buddy = t2
2671 t2.buddy = t1
2672 support.gc_collect()
2673
2674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002676 io = pyio
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677
2678
2679class IncrementalNewlineDecoderTest(unittest.TestCase):
2680
2681 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002682 # UTF-8 specific tests for a newline decoder
2683 def _check_decode(b, s, **kwargs):
2684 # We exercise getstate() / setstate() as well as decode()
2685 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002686 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002687 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002688 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002689
Antoine Pitrou180a3362008-12-14 16:36:46 +00002690 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002691
Antoine Pitrou180a3362008-12-14 16:36:46 +00002692 _check_decode(b'\xe8', "")
2693 _check_decode(b'\xa2', "")
2694 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002695
Antoine Pitrou180a3362008-12-14 16:36:46 +00002696 _check_decode(b'\xe8', "")
2697 _check_decode(b'\xa2', "")
2698 _check_decode(b'\x88', "\u8888")
2699
2700 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002701 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2702
Antoine Pitrou180a3362008-12-14 16:36:46 +00002703 decoder.reset()
2704 _check_decode(b'\n', "\n")
2705 _check_decode(b'\r', "")
2706 _check_decode(b'', "\n", final=True)
2707 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002708
Antoine Pitrou180a3362008-12-14 16:36:46 +00002709 _check_decode(b'\r', "")
2710 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002711
Antoine Pitrou180a3362008-12-14 16:36:46 +00002712 _check_decode(b'\r\r\n', "\n\n")
2713 _check_decode(b'\r', "")
2714 _check_decode(b'\r', "\n")
2715 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002716
Antoine Pitrou180a3362008-12-14 16:36:46 +00002717 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2718 _check_decode(b'\xe8\xa2\x88', "\u8888")
2719 _check_decode(b'\n', "\n")
2720 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2721 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002724 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002725 if encoding is not None:
2726 encoder = codecs.getincrementalencoder(encoding)()
2727 def _decode_bytewise(s):
2728 # Decode one byte at a time
2729 for b in encoder.encode(s):
2730 result.append(decoder.decode(bytes([b])))
2731 else:
2732 encoder = None
2733 def _decode_bytewise(s):
2734 # Decode one char at a time
2735 for c in s:
2736 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002738 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002739 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002740 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002741 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002742 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002743 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002744 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002745 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002746 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002747 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002748 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 input = "abc"
2750 if encoder is not None:
2751 encoder.reset()
2752 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002753 self.assertEqual(decoder.decode(input), "abc")
2754 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002755
2756 def test_newline_decoder(self):
2757 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758 # None meaning the IncrementalNewlineDecoder takes unicode input
2759 # rather than bytes input
2760 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002761 'utf-16', 'utf-16-le', 'utf-16-be',
2762 'utf-32', 'utf-32-le', 'utf-32-be',
2763 )
2764 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 decoder = enc and codecs.getincrementaldecoder(enc)()
2766 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2767 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002768 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002769 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2770 self.check_newline_decoding_utf8(decoder)
2771
Antoine Pitrou66913e22009-03-06 23:40:56 +00002772 def test_newline_bytes(self):
2773 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2774 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002775 self.assertEqual(dec.newlines, None)
2776 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2777 self.assertEqual(dec.newlines, None)
2778 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2779 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002780 dec = self.IncrementalNewlineDecoder(None, translate=False)
2781 _check(dec)
2782 dec = self.IncrementalNewlineDecoder(None, translate=True)
2783 _check(dec)
2784
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002785class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2786 pass
2787
2788class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2789 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002790
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002791
Guido van Rossum01a27522007-03-07 01:00:12 +00002792# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002793
Guido van Rossum5abbf752007-08-27 17:39:33 +00002794class MiscIOTest(unittest.TestCase):
2795
Barry Warsaw40e82462008-11-20 20:14:50 +00002796 def tearDown(self):
2797 support.unlink(support.TESTFN)
2798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799 def test___all__(self):
2800 for name in self.io.__all__:
2801 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002802 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002803 if name == "open":
2804 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002805 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002806 self.assertTrue(issubclass(obj, Exception), name)
2807 elif not name.startswith("SEEK_"):
2808 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002809
Barry Warsaw40e82462008-11-20 20:14:50 +00002810 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002811 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002812 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002813 f.close()
2814
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002815 with support.check_warnings(('', DeprecationWarning)):
2816 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002817 self.assertEqual(f.name, support.TESTFN)
2818 self.assertEqual(f.buffer.name, support.TESTFN)
2819 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2820 self.assertEqual(f.mode, "U")
2821 self.assertEqual(f.buffer.mode, "rb")
2822 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002823 f.close()
2824
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002825 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(f.mode, "w+")
2827 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2828 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002830 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002831 self.assertEqual(g.mode, "wb")
2832 self.assertEqual(g.raw.mode, "wb")
2833 self.assertEqual(g.name, f.fileno())
2834 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002835 f.close()
2836 g.close()
2837
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002838 def test_io_after_close(self):
2839 for kwargs in [
2840 {"mode": "w"},
2841 {"mode": "wb"},
2842 {"mode": "w", "buffering": 1},
2843 {"mode": "w", "buffering": 2},
2844 {"mode": "wb", "buffering": 0},
2845 {"mode": "r"},
2846 {"mode": "rb"},
2847 {"mode": "r", "buffering": 1},
2848 {"mode": "r", "buffering": 2},
2849 {"mode": "rb", "buffering": 0},
2850 {"mode": "w+"},
2851 {"mode": "w+b"},
2852 {"mode": "w+", "buffering": 1},
2853 {"mode": "w+", "buffering": 2},
2854 {"mode": "w+b", "buffering": 0},
2855 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002856 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002857 f.close()
2858 self.assertRaises(ValueError, f.flush)
2859 self.assertRaises(ValueError, f.fileno)
2860 self.assertRaises(ValueError, f.isatty)
2861 self.assertRaises(ValueError, f.__iter__)
2862 if hasattr(f, "peek"):
2863 self.assertRaises(ValueError, f.peek, 1)
2864 self.assertRaises(ValueError, f.read)
2865 if hasattr(f, "read1"):
2866 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002867 if hasattr(f, "readall"):
2868 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002869 if hasattr(f, "readinto"):
2870 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2871 self.assertRaises(ValueError, f.readline)
2872 self.assertRaises(ValueError, f.readlines)
2873 self.assertRaises(ValueError, f.seek, 0)
2874 self.assertRaises(ValueError, f.tell)
2875 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002876 self.assertRaises(ValueError, f.write,
2877 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002878 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002880
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002881 def test_blockingioerror(self):
2882 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002883 class C(str):
2884 pass
2885 c = C("")
2886 b = self.BlockingIOError(1, c)
2887 c.b = b
2888 b.c = c
2889 wr = weakref.ref(c)
2890 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002891 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002892 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893
2894 def test_abcs(self):
2895 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002896 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2897 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2898 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2899 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002900
2901 def _check_abc_inheritance(self, abcmodule):
2902 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002903 self.assertIsInstance(f, abcmodule.IOBase)
2904 self.assertIsInstance(f, abcmodule.RawIOBase)
2905 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2906 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002907 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002908 self.assertIsInstance(f, abcmodule.IOBase)
2909 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2910 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2911 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002912 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002913 self.assertIsInstance(f, abcmodule.IOBase)
2914 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2915 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2916 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002917
2918 def test_abc_inheritance(self):
2919 # Test implementations inherit from their respective ABCs
2920 self._check_abc_inheritance(self)
2921
2922 def test_abc_inheritance_official(self):
2923 # Test implementations inherit from the official ABCs of the
2924 # baseline "io" module.
2925 self._check_abc_inheritance(io)
2926
Antoine Pitroue033e062010-10-29 10:38:18 +00002927 def _check_warn_on_dealloc(self, *args, **kwargs):
2928 f = open(*args, **kwargs)
2929 r = repr(f)
2930 with self.assertWarns(ResourceWarning) as cm:
2931 f = None
2932 support.gc_collect()
2933 self.assertIn(r, str(cm.warning.args[0]))
2934
2935 def test_warn_on_dealloc(self):
2936 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2937 self._check_warn_on_dealloc(support.TESTFN, "wb")
2938 self._check_warn_on_dealloc(support.TESTFN, "w")
2939
2940 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2941 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002942 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002943 for fd in fds:
2944 try:
2945 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002946 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002947 if e.errno != errno.EBADF:
2948 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002949 self.addCleanup(cleanup_fds)
2950 r, w = os.pipe()
2951 fds += r, w
2952 self._check_warn_on_dealloc(r, *args, **kwargs)
2953 # When using closefd=False, there's no warning
2954 r, w = os.pipe()
2955 fds += r, w
2956 with warnings.catch_warnings(record=True) as recorded:
2957 open(r, *args, closefd=False, **kwargs)
2958 support.gc_collect()
2959 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002960
2961 def test_warn_on_dealloc_fd(self):
2962 self._check_warn_on_dealloc_fd("rb", buffering=0)
2963 self._check_warn_on_dealloc_fd("rb")
2964 self._check_warn_on_dealloc_fd("r")
2965
2966
Antoine Pitrou243757e2010-11-05 21:15:39 +00002967 def test_pickling(self):
2968 # Pickling file objects is forbidden
2969 for kwargs in [
2970 {"mode": "w"},
2971 {"mode": "wb"},
2972 {"mode": "wb", "buffering": 0},
2973 {"mode": "r"},
2974 {"mode": "rb"},
2975 {"mode": "rb", "buffering": 0},
2976 {"mode": "w+"},
2977 {"mode": "w+b"},
2978 {"mode": "w+b", "buffering": 0},
2979 ]:
2980 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2981 with self.open(support.TESTFN, **kwargs) as f:
2982 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2983
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002984 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2985 def test_nonblock_pipe_write_bigbuf(self):
2986 self._test_nonblock_pipe_write(16*1024)
2987
2988 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2989 def test_nonblock_pipe_write_smallbuf(self):
2990 self._test_nonblock_pipe_write(1024)
2991
2992 def _set_non_blocking(self, fd):
2993 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2994 self.assertNotEqual(flags, -1)
2995 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2996 self.assertEqual(res, 0)
2997
2998 def _test_nonblock_pipe_write(self, bufsize):
2999 sent = []
3000 received = []
3001 r, w = os.pipe()
3002 self._set_non_blocking(r)
3003 self._set_non_blocking(w)
3004
3005 # To exercise all code paths in the C implementation we need
3006 # to play with buffer sizes. For instance, if we choose a
3007 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3008 # then we will never get a partial write of the buffer.
3009 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3010 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3011
3012 with rf, wf:
3013 for N in 9999, 73, 7574:
3014 try:
3015 i = 0
3016 while True:
3017 msg = bytes([i % 26 + 97]) * N
3018 sent.append(msg)
3019 wf.write(msg)
3020 i += 1
3021
3022 except self.BlockingIOError as e:
3023 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003024 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003025 sent[-1] = sent[-1][:e.characters_written]
3026 received.append(rf.read())
3027 msg = b'BLOCKED'
3028 wf.write(msg)
3029 sent.append(msg)
3030
3031 while True:
3032 try:
3033 wf.flush()
3034 break
3035 except self.BlockingIOError as e:
3036 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003037 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003038 self.assertEqual(e.characters_written, 0)
3039 received.append(rf.read())
3040
3041 received += iter(rf.read, None)
3042
3043 sent, received = b''.join(sent), b''.join(received)
3044 self.assertTrue(sent == received)
3045 self.assertTrue(wf.closed)
3046 self.assertTrue(rf.closed)
3047
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003048 def test_create_fail(self):
3049 # 'x' mode fails if file is existing
3050 with self.open(support.TESTFN, 'w'):
3051 pass
3052 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3053
3054 def test_create_writes(self):
3055 # 'x' mode opens for writing
3056 with self.open(support.TESTFN, 'xb') as f:
3057 f.write(b"spam")
3058 with self.open(support.TESTFN, 'rb') as f:
3059 self.assertEqual(b"spam", f.read())
3060
Christian Heimes7b648752012-09-10 14:48:43 +02003061 def test_open_allargs(self):
3062 # there used to be a buffer overflow in the parser for rawmode
3063 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3064
3065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003066class CMiscIOTest(MiscIOTest):
3067 io = io
3068
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003069 def test_readinto_buffer_overflow(self):
3070 # Issue #18025
3071 class BadReader(self.io.BufferedIOBase):
3072 def read(self, n=-1):
3073 return b'x' * 10**6
3074 bufio = BadReader()
3075 b = bytearray(2)
3076 self.assertRaises(ValueError, bufio.readinto, b)
3077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003078class PyMiscIOTest(MiscIOTest):
3079 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003080
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003081
3082@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3083class SignalsTest(unittest.TestCase):
3084
3085 def setUp(self):
3086 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3087
3088 def tearDown(self):
3089 signal.signal(signal.SIGALRM, self.oldalrm)
3090
3091 def alarm_interrupt(self, sig, frame):
3092 1/0
3093
3094 @unittest.skipUnless(threading, 'Threading required for this test.')
3095 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3096 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003097 invokes the signal handler, and bubbles up the exception raised
3098 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003099 read_results = []
3100 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003101 if hasattr(signal, 'pthread_sigmask'):
3102 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003103 s = os.read(r, 1)
3104 read_results.append(s)
3105 t = threading.Thread(target=_read)
3106 t.daemon = True
3107 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003108 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003109 try:
3110 wio = self.io.open(w, **fdopen_kwargs)
3111 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003112 # Fill the pipe enough that the write will be blocking.
3113 # It will be interrupted by the timer armed above. Since the
3114 # other thread has read one byte, the low-level write will
3115 # return with a successful (partial) result rather than an EINTR.
3116 # The buffered IO layer must check for pending signal
3117 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003118 signal.alarm(1)
3119 try:
3120 self.assertRaises(ZeroDivisionError,
3121 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3122 finally:
3123 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003124 t.join()
3125 # We got one byte, get another one and check that it isn't a
3126 # repeat of the first one.
3127 read_results.append(os.read(r, 1))
3128 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3129 finally:
3130 os.close(w)
3131 os.close(r)
3132 # This is deliberate. If we didn't close the file descriptor
3133 # before closing wio, wio would try to flush its internal
3134 # buffer, and block again.
3135 try:
3136 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003137 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003138 if e.errno != errno.EBADF:
3139 raise
3140
3141 def test_interrupted_write_unbuffered(self):
3142 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3143
3144 def test_interrupted_write_buffered(self):
3145 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3146
3147 def test_interrupted_write_text(self):
3148 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3149
Brett Cannon31f59292011-02-21 19:29:56 +00003150 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003151 def check_reentrant_write(self, data, **fdopen_kwargs):
3152 def on_alarm(*args):
3153 # Will be called reentrantly from the same thread
3154 wio.write(data)
3155 1/0
3156 signal.signal(signal.SIGALRM, on_alarm)
3157 r, w = os.pipe()
3158 wio = self.io.open(w, **fdopen_kwargs)
3159 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003160 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003161 # Either the reentrant call to wio.write() fails with RuntimeError,
3162 # or the signal handler raises ZeroDivisionError.
3163 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3164 while 1:
3165 for i in range(100):
3166 wio.write(data)
3167 wio.flush()
3168 # Make sure the buffer doesn't fill up and block further writes
3169 os.read(r, len(data) * 100)
3170 exc = cm.exception
3171 if isinstance(exc, RuntimeError):
3172 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3173 finally:
3174 wio.close()
3175 os.close(r)
3176
3177 def test_reentrant_write_buffered(self):
3178 self.check_reentrant_write(b"xy", mode="wb")
3179
3180 def test_reentrant_write_text(self):
3181 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3182
Antoine Pitrou707ce822011-02-25 21:24:11 +00003183 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3184 """Check that a buffered read, when it gets interrupted (either
3185 returning a partial result or EINTR), properly invokes the signal
3186 handler and retries if the latter returned successfully."""
3187 r, w = os.pipe()
3188 fdopen_kwargs["closefd"] = False
3189 def alarm_handler(sig, frame):
3190 os.write(w, b"bar")
3191 signal.signal(signal.SIGALRM, alarm_handler)
3192 try:
3193 rio = self.io.open(r, **fdopen_kwargs)
3194 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003195 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003196 # Expected behaviour:
3197 # - first raw read() returns partial b"foo"
3198 # - second raw read() returns EINTR
3199 # - third raw read() returns b"bar"
3200 self.assertEqual(decode(rio.read(6)), "foobar")
3201 finally:
3202 rio.close()
3203 os.close(w)
3204 os.close(r)
3205
Antoine Pitrou20db5112011-08-19 20:32:34 +02003206 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003207 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3208 mode="rb")
3209
Antoine Pitrou20db5112011-08-19 20:32:34 +02003210 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003211 self.check_interrupted_read_retry(lambda x: x,
3212 mode="r")
3213
3214 @unittest.skipUnless(threading, 'Threading required for this test.')
3215 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3216 """Check that a buffered write, when it gets interrupted (either
3217 returning a partial result or EINTR), properly invokes the signal
3218 handler and retries if the latter returned successfully."""
3219 select = support.import_module("select")
3220 # A quantity that exceeds the buffer size of an anonymous pipe's
3221 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003222 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003223 r, w = os.pipe()
3224 fdopen_kwargs["closefd"] = False
3225 # We need a separate thread to read from the pipe and allow the
3226 # write() to finish. This thread is started after the SIGALRM is
3227 # received (forcing a first EINTR in write()).
3228 read_results = []
3229 write_finished = False
3230 def _read():
3231 while not write_finished:
3232 while r in select.select([r], [], [], 1.0)[0]:
3233 s = os.read(r, 1024)
3234 read_results.append(s)
3235 t = threading.Thread(target=_read)
3236 t.daemon = True
3237 def alarm1(sig, frame):
3238 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003239 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003240 def alarm2(sig, frame):
3241 t.start()
3242 signal.signal(signal.SIGALRM, alarm1)
3243 try:
3244 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003245 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003246 # Expected behaviour:
3247 # - first raw write() is partial (because of the limited pipe buffer
3248 # and the first alarm)
3249 # - second raw write() returns EINTR (because of the second alarm)
3250 # - subsequent write()s are successful (either partial or complete)
3251 self.assertEqual(N, wio.write(item * N))
3252 wio.flush()
3253 write_finished = True
3254 t.join()
3255 self.assertEqual(N, sum(len(x) for x in read_results))
3256 finally:
3257 write_finished = True
3258 os.close(w)
3259 os.close(r)
3260 # This is deliberate. If we didn't close the file descriptor
3261 # before closing wio, wio would try to flush its internal
3262 # buffer, and could block (in case of failure).
3263 try:
3264 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003265 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003266 if e.errno != errno.EBADF:
3267 raise
3268
Antoine Pitrou20db5112011-08-19 20:32:34 +02003269 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003270 self.check_interrupted_write_retry(b"x", mode="wb")
3271
Antoine Pitrou20db5112011-08-19 20:32:34 +02003272 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003273 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3274
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003275
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003276class CSignalsTest(SignalsTest):
3277 io = io
3278
3279class PySignalsTest(SignalsTest):
3280 io = pyio
3281
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003282 # Handling reentrancy issues would slow down _pyio even more, so the
3283 # tests are disabled.
3284 test_reentrant_write_buffered = None
3285 test_reentrant_write_text = None
3286
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003287
Ezio Melottidaa42c72013-03-23 16:30:16 +02003288def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003289 tests = (CIOTest, PyIOTest,
3290 CBufferedReaderTest, PyBufferedReaderTest,
3291 CBufferedWriterTest, PyBufferedWriterTest,
3292 CBufferedRWPairTest, PyBufferedRWPairTest,
3293 CBufferedRandomTest, PyBufferedRandomTest,
3294 StatefulIncrementalDecoderTest,
3295 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3296 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003297 CMiscIOTest, PyMiscIOTest,
3298 CSignalsTest, PySignalsTest,
3299 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003300
3301 # Put the namespaces of the IO module we are testing and some useful mock
3302 # classes in the __dict__ of each test.
3303 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003304 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003305 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3306 c_io_ns = {name : getattr(io, name) for name in all_members}
3307 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3308 globs = globals()
3309 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3310 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3311 # Avoid turning open into a bound method.
3312 py_io_ns["open"] = pyio.OpenWrapper
3313 for test in tests:
3314 if test.__name__.startswith("C"):
3315 for name, obj in c_io_ns.items():
3316 setattr(test, name, obj)
3317 elif test.__name__.startswith("Py"):
3318 for name, obj in py_io_ns.items():
3319 setattr(test, name, obj)
3320
Ezio Melottidaa42c72013-03-23 16:30:16 +02003321 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3322 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003323
3324if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003325 unittest.main()