blob: d5b274c85ea3f2445f95d9007c945291a6110554 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka78980432013-01-15 01:12:17 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010039from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000040
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000041import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042import io # C implementation of io
43import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000044try:
45 import threading
46except ImportError:
47 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010048try:
49 import fcntl
50except ImportError:
51 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053def _default_chunk_size():
54 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000055 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 return f._CHUNK_SIZE
57
58
Antoine Pitrou328ec742010-09-14 18:37:24 +000059class MockRawIOWithoutRead:
60 """A RawIO implementation without read(), so as to exercise the default
61 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000063 def __init__(self, read_stack=()):
64 self._read_stack = list(read_stack)
65 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000066 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000067 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000068
Guido van Rossum01a27522007-03-07 01:00:12 +000069 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000070 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000071 return len(b)
72
73 def writable(self):
74 return True
75
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 def fileno(self):
77 return 42
78
79 def readable(self):
80 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000083 return True
84
Guido van Rossum01a27522007-03-07 01:00:12 +000085 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000087
88 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000089 return 0 # same comment as above
90
91 def readinto(self, buf):
92 self._reads += 1
93 max_len = len(buf)
94 try:
95 data = self._read_stack[0]
96 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000097 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 return 0
99 if data is None:
100 del self._read_stack[0]
101 return None
102 n = len(data)
103 if len(data) <= max_len:
104 del self._read_stack[0]
105 buf[:n] = data
106 return n
107 else:
108 buf[:] = data[:max_len]
109 self._read_stack[0] = data[max_len:]
110 return max_len
111
112 def truncate(self, pos=None):
113 return pos
114
Antoine Pitrou328ec742010-09-14 18:37:24 +0000115class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
116 pass
117
118class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
119 pass
120
121
122class MockRawIO(MockRawIOWithoutRead):
123
124 def read(self, n=None):
125 self._reads += 1
126 try:
127 return self._read_stack.pop(0)
128 except:
129 self._extraneous_reads += 1
130 return b""
131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000132class CMockRawIO(MockRawIO, io.RawIOBase):
133 pass
134
135class PyMockRawIO(MockRawIO, pyio.RawIOBase):
136 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000137
Guido van Rossuma9e20242007-03-08 00:43:48 +0000138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000139class MisbehavedRawIO(MockRawIO):
140 def write(self, b):
141 return super().write(b) * 2
142
143 def read(self, n=None):
144 return super().read(n) * 2
145
146 def seek(self, pos, whence):
147 return -123
148
149 def tell(self):
150 return -456
151
152 def readinto(self, buf):
153 super().readinto(buf)
154 return len(buf) * 5
155
156class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
157 pass
158
159class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
160 pass
161
162
163class CloseFailureIO(MockRawIO):
164 closed = 0
165
166 def close(self):
167 if not self.closed:
168 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200169 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170
171class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
172 pass
173
174class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
175 pass
176
177
178class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000179
180 def __init__(self, data):
181 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183
184 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000185 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000186 self.read_history.append(None if res is None else len(res))
187 return res
188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189 def readinto(self, b):
190 res = super().readinto(b)
191 self.read_history.append(res)
192 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class CMockFileIO(MockFileIO, io.BytesIO):
195 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000197class PyMockFileIO(MockFileIO, pyio.BytesIO):
198 pass
199
200
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000201class MockUnseekableIO:
202 def seekable(self):
203 return False
204
205 def seek(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208 def tell(self, *args):
209 raise self.UnsupportedOperation("not seekable")
210
211class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
212 UnsupportedOperation = io.UnsupportedOperation
213
214class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
215 UnsupportedOperation = pyio.UnsupportedOperation
216
217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218class MockNonBlockWriterIO:
219
220 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000221 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000224 def pop_written(self):
225 s = b"".join(self._write_stack)
226 self._write_stack[:] = []
227 return s
228
229 def block_on(self, char):
230 """Block when a given char is encountered."""
231 self._blocker_char = char
232
233 def readable(self):
234 return True
235
236 def seekable(self):
237 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000238
Guido van Rossum01a27522007-03-07 01:00:12 +0000239 def writable(self):
240 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000242 def write(self, b):
243 b = bytes(b)
244 n = -1
245 if self._blocker_char:
246 try:
247 n = b.index(self._blocker_char)
248 except ValueError:
249 pass
250 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100251 if n > 0:
252 # write data up to the first blocker
253 self._write_stack.append(b[:n])
254 return n
255 else:
256 # cancel blocker and indicate would block
257 self._blocker_char = None
258 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259 self._write_stack.append(b)
260 return len(b)
261
262class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
263 BlockingIOError = io.BlockingIOError
264
265class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
266 BlockingIOError = pyio.BlockingIOError
267
Guido van Rossuma9e20242007-03-08 00:43:48 +0000268
Guido van Rossum28524c72007-02-27 05:47:44 +0000269class IOTest(unittest.TestCase):
270
Neal Norwitze7789b12008-03-24 06:18:09 +0000271 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000273
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000275 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000276
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000278 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000279 f.truncate(0)
280 self.assertEqual(f.tell(), 5)
281 f.seek(0)
282
283 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(0), 0)
285 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000288 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000289 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000291 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.seek(-1, 2), 13)
293 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294
Guido van Rossum87429772007-04-10 21:06:59 +0000295 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000296 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000297 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000298
Guido van Rossum9b76da62007-04-11 01:09:03 +0000299 def read_ops(self, f, buffered=False):
300 data = f.read(5)
301 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000302 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000303 self.assertEqual(f.readinto(data), 5)
304 self.assertEqual(data, b" worl")
305 self.assertEqual(f.readinto(data), 2)
306 self.assertEqual(len(data), 5)
307 self.assertEqual(data[:2], b"d\n")
308 self.assertEqual(f.seek(0), 0)
309 self.assertEqual(f.read(20), b"hello world\n")
310 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000311 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000312 self.assertEqual(f.seek(-6, 2), 6)
313 self.assertEqual(f.read(5), b"world")
314 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000315 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000316 self.assertEqual(f.seek(-6, 1), 5)
317 self.assertEqual(f.read(5), b" worl")
318 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000319 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000320 if buffered:
321 f.seek(0)
322 self.assertEqual(f.read(), b"hello world\n")
323 f.seek(6)
324 self.assertEqual(f.read(), b"world\n")
325 self.assertEqual(f.read(), b"")
326
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 LARGE = 2**31
328
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 def large_file_ops(self, f):
330 assert f.readable()
331 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.seek(self.LARGE), self.LARGE)
333 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000334 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 3)
336 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
339 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000340 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000341 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000342 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
343 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000344 self.assertEqual(f.read(2), b"x")
345
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 def test_invalid_operations(self):
347 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000349 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000350 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "wb", buffering=0) as fp:
354 self.assertRaises(exc, fp.read)
355 self.assertRaises(exc, fp.readline)
356 with self.open(support.TESTFN, "rb", buffering=0) as fp:
357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, b"blah")
361 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000362 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000363 self.assertRaises(exc, fp.write, "blah")
364 self.assertRaises(exc, fp.writelines, ["blah\n"])
365 # Non-zero seeking from current or end pos
366 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
367 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000368
Antoine Pitrou13348842012-01-29 18:36:34 +0100369 def test_open_handles_NUL_chars(self):
370 fn_with_NUL = 'foo\0bar'
371 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
372 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
373
Guido van Rossum28524c72007-02-27 05:47:44 +0000374 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000375 with self.open(support.TESTFN, "wb", buffering=0) as f:
376 self.assertEqual(f.readable(), False)
377 self.assertEqual(f.writable(), True)
378 self.assertEqual(f.seekable(), True)
379 self.write_ops(f)
380 with self.open(support.TESTFN, "rb", buffering=0) as f:
381 self.assertEqual(f.readable(), True)
382 self.assertEqual(f.writable(), False)
383 self.assertEqual(f.seekable(), True)
384 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000385
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000387 with self.open(support.TESTFN, "wb") as f:
388 self.assertEqual(f.readable(), False)
389 self.assertEqual(f.writable(), True)
390 self.assertEqual(f.seekable(), True)
391 self.write_ops(f)
392 with self.open(support.TESTFN, "rb") as f:
393 self.assertEqual(f.readable(), True)
394 self.assertEqual(f.writable(), False)
395 self.assertEqual(f.seekable(), True)
396 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000397
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000398 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000399 with self.open(support.TESTFN, "wb") as f:
400 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
401 with self.open(support.TESTFN, "rb") as f:
402 self.assertEqual(f.readline(), b"abc\n")
403 self.assertEqual(f.readline(10), b"def\n")
404 self.assertEqual(f.readline(2), b"xy")
405 self.assertEqual(f.readline(4), b"zzy\n")
406 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000407 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000408 self.assertRaises(TypeError, f.readline, 5.3)
409 with self.open(support.TESTFN, "r") as f:
410 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000411
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000414 self.write_ops(f)
415 data = f.getvalue()
416 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000418 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000419
Guido van Rossum53807da2007-04-10 19:01:47 +0000420 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000421 # On Windows and Mac OSX this test comsumes large resources; It takes
422 # a long time to build the >2GB file and takes >2GB of disk space
423 # therefore the resource must be enabled to run this test.
424 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600425 support.requires(
426 'largefile',
427 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000428 with self.open(support.TESTFN, "w+b", 0) as f:
429 self.large_file_ops(f)
430 with self.open(support.TESTFN, "w+b") as f:
431 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000432
433 def test_with_open(self):
434 for bufsize in (0, 1, 100):
435 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000437 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000438 self.assertEqual(f.closed, True)
439 f = None
440 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000441 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000442 1/0
443 except ZeroDivisionError:
444 self.assertEqual(f.closed, True)
445 else:
446 self.fail("1/0 didn't raise an exception")
447
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 # issue 5008
449 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000457 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def test_destructor(self):
460 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def __del__(self):
463 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 try:
465 f = super().__del__
466 except AttributeError:
467 pass
468 else:
469 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000470 def close(self):
471 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def flush(self):
474 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000476 with support.check_warnings(('', ResourceWarning)):
477 f = MyFileIO(support.TESTFN, "wb")
478 f.write(b"xxx")
479 del f
480 support.gc_collect()
481 self.assertEqual(record, [1, 2, 3])
482 with self.open(support.TESTFN, "rb") as f:
483 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484
485 def _check_base_destructor(self, base):
486 record = []
487 class MyIO(base):
488 def __init__(self):
489 # This exercises the availability of attributes on object
490 # destruction.
491 # (in the C version, close() is called by the tp_dealloc
492 # function, not by __del__)
493 self.on_del = 1
494 self.on_close = 2
495 self.on_flush = 3
496 def __del__(self):
497 record.append(self.on_del)
498 try:
499 f = super().__del__
500 except AttributeError:
501 pass
502 else:
503 f()
504 def close(self):
505 record.append(self.on_close)
506 super().close()
507 def flush(self):
508 record.append(self.on_flush)
509 super().flush()
510 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000511 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000512 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000513 self.assertEqual(record, [1, 2, 3])
514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515 def test_IOBase_destructor(self):
516 self._check_base_destructor(self.IOBase)
517
518 def test_RawIOBase_destructor(self):
519 self._check_base_destructor(self.RawIOBase)
520
521 def test_BufferedIOBase_destructor(self):
522 self._check_base_destructor(self.BufferedIOBase)
523
524 def test_TextIOBase_destructor(self):
525 self._check_base_destructor(self.TextIOBase)
526
Guido van Rossum87429772007-04-10 21:06:59 +0000527 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000528 with self.open(support.TESTFN, "wb") as f:
529 f.write(b"xxx")
530 with self.open(support.TESTFN, "rb") as f:
531 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000532
Guido van Rossumd4103952007-04-12 05:44:49 +0000533 def test_array_writes(self):
534 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000535 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000536 with self.open(support.TESTFN, "wb", 0) as f:
537 self.assertEqual(f.write(a), n)
538 with self.open(support.TESTFN, "wb") as f:
539 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000540
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000541 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000542 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000543 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 def test_read_closed(self):
546 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000547 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 with self.open(support.TESTFN, "r") as f:
549 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 self.assertEqual(file.read(), "egg\n")
551 file.seek(0)
552 file.close()
553 self.assertRaises(ValueError, file.read)
554
555 def test_no_closefd_with_filename(self):
556 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000558
559 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000561 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000563 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 self.assertEqual(file.buffer.raw.closefd, False)
566
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 def test_garbage_collection(self):
568 # FileIO objects are collected, and collecting them flushes
569 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000570 with support.check_warnings(('', ResourceWarning)):
571 f = self.FileIO(support.TESTFN, "wb")
572 f.write(b"abcxxx")
573 f.f = f
574 wr = weakref.ref(f)
575 del f
576 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000577 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000578 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000579 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000580
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 def test_unbounded_file(self):
582 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
583 zero = "/dev/zero"
584 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000585 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000587 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000590 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000592 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000593 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
596
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def test_flush_error_on_close(self):
598 f = self.open(support.TESTFN, "wb", buffering=0)
599 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200600 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200602 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600603 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000604
605 def test_multi_close(self):
606 f = self.open(support.TESTFN, "wb", buffering=0)
607 f.close()
608 f.close()
609 f.close()
610 self.assertRaises(ValueError, f.flush)
611
Antoine Pitrou328ec742010-09-14 18:37:24 +0000612 def test_RawIOBase_read(self):
613 # Exercise the default RawIOBase.read() implementation (which calls
614 # readinto() internally).
615 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
616 self.assertEqual(rawio.read(2), b"ab")
617 self.assertEqual(rawio.read(2), b"c")
618 self.assertEqual(rawio.read(2), b"d")
619 self.assertEqual(rawio.read(2), None)
620 self.assertEqual(rawio.read(2), b"ef")
621 self.assertEqual(rawio.read(2), b"g")
622 self.assertEqual(rawio.read(2), None)
623 self.assertEqual(rawio.read(2), b"")
624
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400625 def test_types_have_dict(self):
626 test = (
627 self.IOBase(),
628 self.RawIOBase(),
629 self.TextIOBase(),
630 self.StringIO(),
631 self.BytesIO()
632 )
633 for obj in test:
634 self.assertTrue(hasattr(obj, "__dict__"))
635
Ross Lagerwall59142db2011-10-31 20:34:46 +0200636 def test_opener(self):
637 with self.open(support.TESTFN, "w") as f:
638 f.write("egg\n")
639 fd = os.open(support.TESTFN, os.O_RDONLY)
640 def opener(path, flags):
641 return fd
642 with self.open("non-existent", "r", opener=opener) as f:
643 self.assertEqual(f.read(), "egg\n")
644
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200645 def test_fileio_closefd(self):
646 # Issue #4841
647 with self.open(__file__, 'rb') as f1, \
648 self.open(__file__, 'rb') as f2:
649 fileio = self.FileIO(f1.fileno(), closefd=False)
650 # .__init__() must not close f1
651 fileio.__init__(f2.fileno(), closefd=False)
652 f1.readline()
653 # .close() must not close f2
654 fileio.close()
655 f2.readline()
656
657
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000658class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200659
660 def test_IOBase_finalize(self):
661 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
662 # class which inherits IOBase and an object of this class are caught
663 # in a reference cycle and close() is already in the method cache.
664 class MyIO(self.IOBase):
665 def close(self):
666 pass
667
668 # create an instance to populate the method cache
669 MyIO()
670 obj = MyIO()
671 obj.obj = obj
672 wr = weakref.ref(obj)
673 del MyIO
674 del obj
675 support.gc_collect()
676 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000678class PyIOTest(IOTest):
679 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000680
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class CommonBufferedTests:
683 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
684
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000685 def test_detach(self):
686 raw = self.MockRawIO()
687 buf = self.tp(raw)
688 self.assertIs(buf.detach(), raw)
689 self.assertRaises(ValueError, buf.detach)
690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691 def test_fileno(self):
692 rawio = self.MockRawIO()
693 bufio = self.tp(rawio)
694
Ezio Melottib3aedd42010-11-20 19:04:17 +0000695 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696
Zachary Ware9fe6d862013-12-08 00:20:35 -0600697 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 def test_no_fileno(self):
699 # XXX will we always have fileno() function? If so, kill
700 # this test. Else, write it.
701 pass
702
703 def test_invalid_args(self):
704 rawio = self.MockRawIO()
705 bufio = self.tp(rawio)
706 # Invalid whence
707 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200708 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709
710 def test_override_destructor(self):
711 tp = self.tp
712 record = []
713 class MyBufferedIO(tp):
714 def __del__(self):
715 record.append(1)
716 try:
717 f = super().__del__
718 except AttributeError:
719 pass
720 else:
721 f()
722 def close(self):
723 record.append(2)
724 super().close()
725 def flush(self):
726 record.append(3)
727 super().flush()
728 rawio = self.MockRawIO()
729 bufio = MyBufferedIO(rawio)
730 writable = bufio.writable()
731 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000732 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 if writable:
734 self.assertEqual(record, [1, 2, 3])
735 else:
736 self.assertEqual(record, [1, 2])
737
738 def test_context_manager(self):
739 # Test usability as a context manager
740 rawio = self.MockRawIO()
741 bufio = self.tp(rawio)
742 def _with():
743 with bufio:
744 pass
745 _with()
746 # bufio should now be closed, and using it a second time should raise
747 # a ValueError.
748 self.assertRaises(ValueError, _with)
749
750 def test_error_through_destructor(self):
751 # Test that the exception state is not modified by a destructor,
752 # even if close() fails.
753 rawio = self.CloseFailureIO()
754 def f():
755 self.tp(rawio).xyzzy
756 with support.captured_output("stderr") as s:
757 self.assertRaises(AttributeError, f)
758 s = s.getvalue().strip()
759 if s:
760 # The destructor *may* have printed an unraisable error, check it
761 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200762 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000763 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000764
Antoine Pitrou716c4442009-05-23 19:04:03 +0000765 def test_repr(self):
766 raw = self.MockRawIO()
767 b = self.tp(raw)
768 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
769 self.assertEqual(repr(b), "<%s>" % clsname)
770 raw.name = "dummy"
771 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
772 raw.name = b"dummy"
773 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
774
Antoine Pitrou6be88762010-05-03 16:48:20 +0000775 def test_flush_error_on_close(self):
776 raw = self.MockRawIO()
777 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200778 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000779 raw.flush = bad_flush
780 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200781 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600782 self.assertTrue(b.closed)
783
784 def test_close_error_on_close(self):
785 raw = self.MockRawIO()
786 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200787 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600788 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200789 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600790 raw.close = bad_close
791 b = self.tp(raw)
792 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200793 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600794 b.close()
795 self.assertEqual(err.exception.args, ('close',))
796 self.assertEqual(err.exception.__context__.args, ('flush',))
797 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000798
799 def test_multi_close(self):
800 raw = self.MockRawIO()
801 b = self.tp(raw)
802 b.close()
803 b.close()
804 b.close()
805 self.assertRaises(ValueError, b.flush)
806
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000807 def test_unseekable(self):
808 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
809 self.assertRaises(self.UnsupportedOperation, bufio.tell)
810 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
811
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000812 def test_readonly_attributes(self):
813 raw = self.MockRawIO()
814 buf = self.tp(raw)
815 x = self.MockRawIO()
816 with self.assertRaises(AttributeError):
817 buf.raw = x
818
Guido van Rossum78892e42007-04-06 17:31:18 +0000819
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200820class SizeofTest:
821
822 @support.cpython_only
823 def test_sizeof(self):
824 bufsize1 = 4096
825 bufsize2 = 8192
826 rawio = self.MockRawIO()
827 bufio = self.tp(rawio, buffer_size=bufsize1)
828 size = sys.getsizeof(bufio) - bufsize1
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize2)
831 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
832
Jesus Ceadc469452012-10-04 12:37:56 +0200833 @support.cpython_only
834 def test_buffer_freeing(self) :
835 bufsize = 4096
836 rawio = self.MockRawIO()
837 bufio = self.tp(rawio, buffer_size=bufsize)
838 size = sys.getsizeof(bufio) - bufsize
839 bufio.close()
840 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000842class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
843 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 def test_constructor(self):
846 rawio = self.MockRawIO([b"abc"])
847 bufio = self.tp(rawio)
848 bufio.__init__(rawio)
849 bufio.__init__(rawio, buffer_size=1024)
850 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000851 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
855 rawio = self.MockRawIO([b"abc"])
856 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000857 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000860 for arg in (None, 7):
861 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
862 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000863 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 # Invalid args
865 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 def test_read1(self):
868 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
869 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000870 self.assertEqual(b"a", bufio.read(1))
871 self.assertEqual(b"b", bufio.read1(1))
872 self.assertEqual(rawio._reads, 1)
873 self.assertEqual(b"c", bufio.read1(100))
874 self.assertEqual(rawio._reads, 1)
875 self.assertEqual(b"d", bufio.read1(100))
876 self.assertEqual(rawio._reads, 2)
877 self.assertEqual(b"efg", bufio.read1(100))
878 self.assertEqual(rawio._reads, 3)
879 self.assertEqual(b"", bufio.read1(100))
880 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881 # Invalid args
882 self.assertRaises(ValueError, bufio.read1, -1)
883
884 def test_readinto(self):
885 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
886 bufio = self.tp(rawio)
887 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(bufio.readinto(b), 2)
889 self.assertEqual(b, b"ab")
890 self.assertEqual(bufio.readinto(b), 2)
891 self.assertEqual(b, b"cd")
892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"ef")
894 self.assertEqual(bufio.readinto(b), 1)
895 self.assertEqual(b, b"gf")
896 self.assertEqual(bufio.readinto(b), 0)
897 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200898 rawio = self.MockRawIO((b"abc", None))
899 bufio = self.tp(rawio)
900 self.assertEqual(bufio.readinto(b), 2)
901 self.assertEqual(b, b"ab")
902 self.assertEqual(bufio.readinto(b), 1)
903 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000905 def test_readlines(self):
906 def bufio():
907 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
908 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000909 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
910 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
911 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000913 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000914 data = b"abcdefghi"
915 dlen = len(data)
916
917 tests = [
918 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
919 [ 100, [ 3, 3, 3], [ dlen ] ],
920 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
921 ]
922
923 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 rawio = self.MockFileIO(data)
925 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000926 pos = 0
927 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000928 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000929 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000934 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
936 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"abcd", bufio.read(6))
938 self.assertEqual(b"e", bufio.read(1))
939 self.assertEqual(b"fg", bufio.read())
940 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200941 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000942 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000943
Victor Stinnera80987f2011-05-25 22:47:16 +0200944 rawio = self.MockRawIO((b"a", None, None))
945 self.assertEqual(b"a", rawio.readall())
946 self.assertIsNone(rawio.readall())
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 def test_read_past_eof(self):
949 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
950 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000951
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_read_all(self):
955 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
956 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000957
Ezio Melottib3aedd42010-11-20 19:04:17 +0000958 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000959
Victor Stinner45df8202010-04-28 22:31:17 +0000960 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000961 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000963 try:
964 # Write out many bytes with exactly the same number of 0's,
965 # 1's... 255's. This will help us check that concurrent reading
966 # doesn't duplicate or forget contents.
967 N = 1000
968 l = list(range(256)) * N
969 random.shuffle(l)
970 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000971 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000973 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000975 errors = []
976 results = []
977 def f():
978 try:
979 # Intra-buffer read then buffer-flushing read
980 for n in cycle([1, 19]):
981 s = bufio.read(n)
982 if not s:
983 break
984 # list.append() is atomic
985 results.append(s)
986 except Exception as e:
987 errors.append(e)
988 raise
989 threads = [threading.Thread(target=f) for x in range(20)]
990 for t in threads:
991 t.start()
992 time.sleep(0.02) # yield
993 for t in threads:
994 t.join()
995 self.assertFalse(errors,
996 "the following exceptions were caught: %r" % errors)
997 s = b''.join(results)
998 for i in range(256):
999 c = bytes(bytearray([i]))
1000 self.assertEqual(s.count(c), N)
1001 finally:
1002 support.unlink(support.TESTFN)
1003
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001004 def test_unseekable(self):
1005 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1006 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1007 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1008 bufio.read(1)
1009 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1010 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1011
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001012 def test_misbehaved_io(self):
1013 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1014 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001015 self.assertRaises(OSError, bufio.seek, 0)
1016 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001018 def test_no_extraneous_read(self):
1019 # Issue #9550; when the raw IO object has satisfied the read request,
1020 # we should not issue any additional reads, otherwise it may block
1021 # (e.g. socket).
1022 bufsize = 16
1023 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1024 rawio = self.MockRawIO([b"x" * n])
1025 bufio = self.tp(rawio, bufsize)
1026 self.assertEqual(bufio.read(n), b"x" * n)
1027 # Simple case: one raw read is enough to satisfy the request.
1028 self.assertEqual(rawio._extraneous_reads, 0,
1029 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1030 # A more complex case where two raw reads are needed to satisfy
1031 # the request.
1032 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1033 bufio = self.tp(rawio, bufsize)
1034 self.assertEqual(bufio.read(n), b"x" * n)
1035 self.assertEqual(rawio._extraneous_reads, 0,
1036 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1037
1038
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001039class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 tp = io.BufferedReader
1041
1042 def test_constructor(self):
1043 BufferedReaderTest.test_constructor(self)
1044 # The allocation can succeed on 32-bit builds, e.g. with more
1045 # than 2GB RAM and a 64-bit kernel.
1046 if sys.maxsize > 0x7FFFFFFF:
1047 rawio = self.MockRawIO()
1048 bufio = self.tp(rawio)
1049 self.assertRaises((OverflowError, MemoryError, ValueError),
1050 bufio.__init__, rawio, sys.maxsize)
1051
1052 def test_initialization(self):
1053 rawio = self.MockRawIO([b"abc"])
1054 bufio = self.tp(rawio)
1055 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1056 self.assertRaises(ValueError, bufio.read)
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1058 self.assertRaises(ValueError, bufio.read)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1060 self.assertRaises(ValueError, bufio.read)
1061
1062 def test_misbehaved_io_read(self):
1063 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1064 bufio = self.tp(rawio)
1065 # _pyio.BufferedReader seems to implement reading different, so that
1066 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001067 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068
1069 def test_garbage_collection(self):
1070 # C BufferedReader objects are collected.
1071 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001072 with support.check_warnings(('', ResourceWarning)):
1073 rawio = self.FileIO(support.TESTFN, "w+b")
1074 f = self.tp(rawio)
1075 f.f = f
1076 wr = weakref.ref(f)
1077 del f
1078 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001079 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080
R David Murray67bfe802013-02-23 21:51:05 -05001081 def test_args_error(self):
1082 # Issue #17275
1083 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1084 self.tp(io.BytesIO(), 1024, 1024, 1024)
1085
1086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087class PyBufferedReaderTest(BufferedReaderTest):
1088 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001089
Guido van Rossuma9e20242007-03-08 00:43:48 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1092 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 def test_constructor(self):
1095 rawio = self.MockRawIO()
1096 bufio = self.tp(rawio)
1097 bufio.__init__(rawio)
1098 bufio.__init__(rawio, buffer_size=1024)
1099 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001100 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 bufio.flush()
1102 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1103 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1104 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1105 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001106 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001108 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001110 def test_detach_flush(self):
1111 raw = self.MockRawIO()
1112 buf = self.tp(raw)
1113 buf.write(b"howdy!")
1114 self.assertFalse(raw._write_stack)
1115 buf.detach()
1116 self.assertEqual(raw._write_stack, [b"howdy!"])
1117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001119 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 writer = self.MockRawIO()
1121 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001122 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001123 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 def test_write_overflow(self):
1126 writer = self.MockRawIO()
1127 bufio = self.tp(writer, 8)
1128 contents = b"abcdefghijklmnop"
1129 for n in range(0, len(contents), 3):
1130 bufio.write(contents[n:n+3])
1131 flushed = b"".join(writer._write_stack)
1132 # At least (total - 8) bytes were implicitly flushed, perhaps more
1133 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001134 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001136 def check_writes(self, intermediate_func):
1137 # Lots of writes, test the flushed output is as expected.
1138 contents = bytes(range(256)) * 1000
1139 n = 0
1140 writer = self.MockRawIO()
1141 bufio = self.tp(writer, 13)
1142 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1143 def gen_sizes():
1144 for size in count(1):
1145 for i in range(15):
1146 yield size
1147 sizes = gen_sizes()
1148 while n < len(contents):
1149 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001150 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 intermediate_func(bufio)
1152 n += size
1153 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001154 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_writes(self):
1157 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001159 def test_writes_and_flushes(self):
1160 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 def test_writes_and_seeks(self):
1163 def _seekabs(bufio):
1164 pos = bufio.tell()
1165 bufio.seek(pos + 1, 0)
1166 bufio.seek(pos - 1, 0)
1167 bufio.seek(pos, 0)
1168 self.check_writes(_seekabs)
1169 def _seekrel(bufio):
1170 pos = bufio.seek(0, 1)
1171 bufio.seek(+1, 1)
1172 bufio.seek(-1, 1)
1173 bufio.seek(pos, 0)
1174 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 def test_writes_and_truncates(self):
1177 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 def test_write_non_blocking(self):
1180 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001181 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001182
Ezio Melottib3aedd42010-11-20 19:04:17 +00001183 self.assertEqual(bufio.write(b"abcd"), 4)
1184 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 # 1 byte will be written, the rest will be buffered
1186 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001187 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1190 raw.block_on(b"0")
1191 try:
1192 bufio.write(b"opqrwxyz0123456789")
1193 except self.BlockingIOError as e:
1194 written = e.characters_written
1195 else:
1196 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001197 self.assertEqual(written, 16)
1198 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001199 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001200
Ezio Melottib3aedd42010-11-20 19:04:17 +00001201 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 s = raw.pop_written()
1203 # Previously buffered bytes were flushed
1204 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 def test_write_and_rewind(self):
1207 raw = io.BytesIO()
1208 bufio = self.tp(raw, 4)
1209 self.assertEqual(bufio.write(b"abcdef"), 6)
1210 self.assertEqual(bufio.tell(), 6)
1211 bufio.seek(0, 0)
1212 self.assertEqual(bufio.write(b"XY"), 2)
1213 bufio.seek(6, 0)
1214 self.assertEqual(raw.getvalue(), b"XYcdef")
1215 self.assertEqual(bufio.write(b"123456"), 6)
1216 bufio.flush()
1217 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219 def test_flush(self):
1220 writer = self.MockRawIO()
1221 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001222 bufio.write(b"abc")
1223 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001224 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001225
Antoine Pitrou131a4892012-10-16 22:57:11 +02001226 def test_writelines(self):
1227 l = [b'ab', b'cd', b'ef']
1228 writer = self.MockRawIO()
1229 bufio = self.tp(writer, 8)
1230 bufio.writelines(l)
1231 bufio.flush()
1232 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1233
1234 def test_writelines_userlist(self):
1235 l = UserList([b'ab', b'cd', b'ef'])
1236 writer = self.MockRawIO()
1237 bufio = self.tp(writer, 8)
1238 bufio.writelines(l)
1239 bufio.flush()
1240 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1241
1242 def test_writelines_error(self):
1243 writer = self.MockRawIO()
1244 bufio = self.tp(writer, 8)
1245 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1246 self.assertRaises(TypeError, bufio.writelines, None)
1247 self.assertRaises(TypeError, bufio.writelines, 'abc')
1248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 def test_destructor(self):
1250 writer = self.MockRawIO()
1251 bufio = self.tp(writer, 8)
1252 bufio.write(b"abc")
1253 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001254 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001255 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001256
1257 def test_truncate(self):
1258 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001259 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001260 bufio = self.tp(raw, 8)
1261 bufio.write(b"abcdef")
1262 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001263 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001264 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 self.assertEqual(f.read(), b"abc")
1266
Victor Stinner45df8202010-04-28 22:31:17 +00001267 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001268 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001270 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 # Write out many bytes from many threads and test they were
1272 # all flushed.
1273 N = 1000
1274 contents = bytes(range(256)) * N
1275 sizes = cycle([1, 19])
1276 n = 0
1277 queue = deque()
1278 while n < len(contents):
1279 size = next(sizes)
1280 queue.append(contents[n:n+size])
1281 n += size
1282 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001283 # We use a real file object because it allows us to
1284 # exercise situations where the GIL is released before
1285 # writing the buffer to the raw streams. This is in addition
1286 # to concurrency issues due to switching threads in the middle
1287 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001288 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001290 errors = []
1291 def f():
1292 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 while True:
1294 try:
1295 s = queue.popleft()
1296 except IndexError:
1297 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001298 bufio.write(s)
1299 except Exception as e:
1300 errors.append(e)
1301 raise
1302 threads = [threading.Thread(target=f) for x in range(20)]
1303 for t in threads:
1304 t.start()
1305 time.sleep(0.02) # yield
1306 for t in threads:
1307 t.join()
1308 self.assertFalse(errors,
1309 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001311 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 s = f.read()
1313 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001314 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001315 finally:
1316 support.unlink(support.TESTFN)
1317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def test_misbehaved_io(self):
1319 rawio = self.MisbehavedRawIO()
1320 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001321 self.assertRaises(OSError, bufio.seek, 0)
1322 self.assertRaises(OSError, bufio.tell)
1323 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001324
Florent Xicluna109d5732012-07-07 17:03:22 +02001325 def test_max_buffer_size_removal(self):
1326 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001327 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001328
Benjamin Peterson68623612012-12-20 11:53:11 -06001329 def test_write_error_on_close(self):
1330 raw = self.MockRawIO()
1331 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001332 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001333 raw.write = bad_write
1334 b = self.tp(raw)
1335 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001336 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001337 self.assertTrue(b.closed)
1338
Benjamin Peterson59406a92009-03-26 17:10:29 +00001339
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001340class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001341 tp = io.BufferedWriter
1342
1343 def test_constructor(self):
1344 BufferedWriterTest.test_constructor(self)
1345 # The allocation can succeed on 32-bit builds, e.g. with more
1346 # than 2GB RAM and a 64-bit kernel.
1347 if sys.maxsize > 0x7FFFFFFF:
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises((OverflowError, MemoryError, ValueError),
1351 bufio.__init__, rawio, sys.maxsize)
1352
1353 def test_initialization(self):
1354 rawio = self.MockRawIO()
1355 bufio = self.tp(rawio)
1356 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1357 self.assertRaises(ValueError, bufio.write, b"def")
1358 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1359 self.assertRaises(ValueError, bufio.write, b"def")
1360 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1361 self.assertRaises(ValueError, bufio.write, b"def")
1362
1363 def test_garbage_collection(self):
1364 # C BufferedWriter objects are collected, and collecting them flushes
1365 # all data to disk.
1366 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001367 with support.check_warnings(('', ResourceWarning)):
1368 rawio = self.FileIO(support.TESTFN, "w+b")
1369 f = self.tp(rawio)
1370 f.write(b"123xxx")
1371 f.x = f
1372 wr = weakref.ref(f)
1373 del f
1374 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001375 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001376 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377 self.assertEqual(f.read(), b"123xxx")
1378
R David Murray67bfe802013-02-23 21:51:05 -05001379 def test_args_error(self):
1380 # Issue #17275
1381 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1382 self.tp(io.BytesIO(), 1024, 1024, 1024)
1383
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001384
1385class PyBufferedWriterTest(BufferedWriterTest):
1386 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001387
Guido van Rossum01a27522007-03-07 01:00:12 +00001388class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001389
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001390 def test_constructor(self):
1391 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001392 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001393
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001394 def test_detach(self):
1395 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1396 self.assertRaises(self.UnsupportedOperation, pair.detach)
1397
Florent Xicluna109d5732012-07-07 17:03:22 +02001398 def test_constructor_max_buffer_size_removal(self):
1399 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001400 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001401
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001402 def test_constructor_with_not_readable(self):
1403 class NotReadable(MockRawIO):
1404 def readable(self):
1405 return False
1406
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001407 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001408
1409 def test_constructor_with_not_writeable(self):
1410 class NotWriteable(MockRawIO):
1411 def writable(self):
1412 return False
1413
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001414 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001415
1416 def test_read(self):
1417 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1418
1419 self.assertEqual(pair.read(3), b"abc")
1420 self.assertEqual(pair.read(1), b"d")
1421 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001422 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1423 self.assertEqual(pair.read(None), b"abc")
1424
1425 def test_readlines(self):
1426 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1427 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1428 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1429 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001430
1431 def test_read1(self):
1432 # .read1() is delegated to the underlying reader object, so this test
1433 # can be shallow.
1434 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1435
1436 self.assertEqual(pair.read1(3), b"abc")
1437
1438 def test_readinto(self):
1439 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1440
1441 data = bytearray(5)
1442 self.assertEqual(pair.readinto(data), 5)
1443 self.assertEqual(data, b"abcde")
1444
1445 def test_write(self):
1446 w = self.MockRawIO()
1447 pair = self.tp(self.MockRawIO(), w)
1448
1449 pair.write(b"abc")
1450 pair.flush()
1451 pair.write(b"def")
1452 pair.flush()
1453 self.assertEqual(w._write_stack, [b"abc", b"def"])
1454
1455 def test_peek(self):
1456 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1457
1458 self.assertTrue(pair.peek(3).startswith(b"abc"))
1459 self.assertEqual(pair.read(3), b"abc")
1460
1461 def test_readable(self):
1462 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1463 self.assertTrue(pair.readable())
1464
1465 def test_writeable(self):
1466 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1467 self.assertTrue(pair.writable())
1468
1469 def test_seekable(self):
1470 # BufferedRWPairs are never seekable, even if their readers and writers
1471 # are.
1472 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1473 self.assertFalse(pair.seekable())
1474
1475 # .flush() is delegated to the underlying writer object and has been
1476 # tested in the test_write method.
1477
1478 def test_close_and_closed(self):
1479 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1480 self.assertFalse(pair.closed)
1481 pair.close()
1482 self.assertTrue(pair.closed)
1483
1484 def test_isatty(self):
1485 class SelectableIsAtty(MockRawIO):
1486 def __init__(self, isatty):
1487 MockRawIO.__init__(self)
1488 self._isatty = isatty
1489
1490 def isatty(self):
1491 return self._isatty
1492
1493 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1494 self.assertFalse(pair.isatty())
1495
1496 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1497 self.assertTrue(pair.isatty())
1498
1499 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1500 self.assertTrue(pair.isatty())
1501
1502 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1503 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001504
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505class CBufferedRWPairTest(BufferedRWPairTest):
1506 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001508class PyBufferedRWPairTest(BufferedRWPairTest):
1509 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001510
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511
1512class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1513 read_mode = "rb+"
1514 write_mode = "wb+"
1515
1516 def test_constructor(self):
1517 BufferedReaderTest.test_constructor(self)
1518 BufferedWriterTest.test_constructor(self)
1519
1520 def test_read_and_write(self):
1521 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001522 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001523
1524 self.assertEqual(b"as", rw.read(2))
1525 rw.write(b"ddd")
1526 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001527 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001528 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001529 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531 def test_seek_and_tell(self):
1532 raw = self.BytesIO(b"asdfghjkl")
1533 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001534
Ezio Melottib3aedd42010-11-20 19:04:17 +00001535 self.assertEqual(b"as", rw.read(2))
1536 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001537 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001539
Antoine Pitroue05565e2011-08-20 14:39:23 +02001540 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001541 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001542 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001543 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001544 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001545 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001546 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001547 self.assertEqual(7, rw.tell())
1548 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001549 rw.flush()
1550 self.assertEqual(b"asdf123fl", raw.getvalue())
1551
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001552 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 def check_flush_and_read(self, read_func):
1555 raw = self.BytesIO(b"abcdefghi")
1556 bufio = self.tp(raw)
1557
Ezio Melottib3aedd42010-11-20 19:04:17 +00001558 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001560 self.assertEqual(b"ef", read_func(bufio, 2))
1561 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001563 self.assertEqual(6, bufio.tell())
1564 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001565 raw.seek(0, 0)
1566 raw.write(b"XYZ")
1567 # flush() resets the read buffer
1568 bufio.flush()
1569 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001570 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
1572 def test_flush_and_read(self):
1573 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1574
1575 def test_flush_and_readinto(self):
1576 def _readinto(bufio, n=-1):
1577 b = bytearray(n if n >= 0 else 9999)
1578 n = bufio.readinto(b)
1579 return bytes(b[:n])
1580 self.check_flush_and_read(_readinto)
1581
1582 def test_flush_and_peek(self):
1583 def _peek(bufio, n=-1):
1584 # This relies on the fact that the buffer can contain the whole
1585 # raw stream, otherwise peek() can return less.
1586 b = bufio.peek(n)
1587 if n != -1:
1588 b = b[:n]
1589 bufio.seek(len(b), 1)
1590 return b
1591 self.check_flush_and_read(_peek)
1592
1593 def test_flush_and_write(self):
1594 raw = self.BytesIO(b"abcdefghi")
1595 bufio = self.tp(raw)
1596
1597 bufio.write(b"123")
1598 bufio.flush()
1599 bufio.write(b"45")
1600 bufio.flush()
1601 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001602 self.assertEqual(b"12345fghi", raw.getvalue())
1603 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001604
1605 def test_threads(self):
1606 BufferedReaderTest.test_threads(self)
1607 BufferedWriterTest.test_threads(self)
1608
1609 def test_writes_and_peek(self):
1610 def _peek(bufio):
1611 bufio.peek(1)
1612 self.check_writes(_peek)
1613 def _peek(bufio):
1614 pos = bufio.tell()
1615 bufio.seek(-1, 1)
1616 bufio.peek(1)
1617 bufio.seek(pos, 0)
1618 self.check_writes(_peek)
1619
1620 def test_writes_and_reads(self):
1621 def _read(bufio):
1622 bufio.seek(-1, 1)
1623 bufio.read(1)
1624 self.check_writes(_read)
1625
1626 def test_writes_and_read1s(self):
1627 def _read1(bufio):
1628 bufio.seek(-1, 1)
1629 bufio.read1(1)
1630 self.check_writes(_read1)
1631
1632 def test_writes_and_readintos(self):
1633 def _read(bufio):
1634 bufio.seek(-1, 1)
1635 bufio.readinto(bytearray(1))
1636 self.check_writes(_read)
1637
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001638 def test_write_after_readahead(self):
1639 # Issue #6629: writing after the buffer was filled by readahead should
1640 # first rewind the raw stream.
1641 for overwrite_size in [1, 5]:
1642 raw = self.BytesIO(b"A" * 10)
1643 bufio = self.tp(raw, 4)
1644 # Trigger readahead
1645 self.assertEqual(bufio.read(1), b"A")
1646 self.assertEqual(bufio.tell(), 1)
1647 # Overwriting should rewind the raw stream if it needs so
1648 bufio.write(b"B" * overwrite_size)
1649 self.assertEqual(bufio.tell(), overwrite_size + 1)
1650 # If the write size was smaller than the buffer size, flush() and
1651 # check that rewind happens.
1652 bufio.flush()
1653 self.assertEqual(bufio.tell(), overwrite_size + 1)
1654 s = raw.getvalue()
1655 self.assertEqual(s,
1656 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1657
Antoine Pitrou7c404892011-05-13 00:13:33 +02001658 def test_write_rewind_write(self):
1659 # Various combinations of reading / writing / seeking backwards / writing again
1660 def mutate(bufio, pos1, pos2):
1661 assert pos2 >= pos1
1662 # Fill the buffer
1663 bufio.seek(pos1)
1664 bufio.read(pos2 - pos1)
1665 bufio.write(b'\x02')
1666 # This writes earlier than the previous write, but still inside
1667 # the buffer.
1668 bufio.seek(pos1)
1669 bufio.write(b'\x01')
1670
1671 b = b"\x80\x81\x82\x83\x84"
1672 for i in range(0, len(b)):
1673 for j in range(i, len(b)):
1674 raw = self.BytesIO(b)
1675 bufio = self.tp(raw, 100)
1676 mutate(bufio, i, j)
1677 bufio.flush()
1678 expected = bytearray(b)
1679 expected[j] = 2
1680 expected[i] = 1
1681 self.assertEqual(raw.getvalue(), expected,
1682 "failed result for i=%d, j=%d" % (i, j))
1683
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001684 def test_truncate_after_read_or_write(self):
1685 raw = self.BytesIO(b"A" * 10)
1686 bufio = self.tp(raw, 100)
1687 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1688 self.assertEqual(bufio.truncate(), 2)
1689 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1690 self.assertEqual(bufio.truncate(), 4)
1691
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 def test_misbehaved_io(self):
1693 BufferedReaderTest.test_misbehaved_io(self)
1694 BufferedWriterTest.test_misbehaved_io(self)
1695
Antoine Pitroue05565e2011-08-20 14:39:23 +02001696 def test_interleaved_read_write(self):
1697 # Test for issue #12213
1698 with self.BytesIO(b'abcdefgh') as raw:
1699 with self.tp(raw, 100) as f:
1700 f.write(b"1")
1701 self.assertEqual(f.read(1), b'b')
1702 f.write(b'2')
1703 self.assertEqual(f.read1(1), b'd')
1704 f.write(b'3')
1705 buf = bytearray(1)
1706 f.readinto(buf)
1707 self.assertEqual(buf, b'f')
1708 f.write(b'4')
1709 self.assertEqual(f.peek(1), b'h')
1710 f.flush()
1711 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1712
1713 with self.BytesIO(b'abc') as raw:
1714 with self.tp(raw, 100) as f:
1715 self.assertEqual(f.read(1), b'a')
1716 f.write(b"2")
1717 self.assertEqual(f.read(1), b'c')
1718 f.flush()
1719 self.assertEqual(raw.getvalue(), b'a2c')
1720
1721 def test_interleaved_readline_write(self):
1722 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1723 with self.tp(raw) as f:
1724 f.write(b'1')
1725 self.assertEqual(f.readline(), b'b\n')
1726 f.write(b'2')
1727 self.assertEqual(f.readline(), b'def\n')
1728 f.write(b'3')
1729 self.assertEqual(f.readline(), b'\n')
1730 f.flush()
1731 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1732
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001733 # You can't construct a BufferedRandom over a non-seekable stream.
1734 test_unseekable = None
1735
R David Murray67bfe802013-02-23 21:51:05 -05001736
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001737class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 tp = io.BufferedRandom
1739
1740 def test_constructor(self):
1741 BufferedRandomTest.test_constructor(self)
1742 # The allocation can succeed on 32-bit builds, e.g. with more
1743 # than 2GB RAM and a 64-bit kernel.
1744 if sys.maxsize > 0x7FFFFFFF:
1745 rawio = self.MockRawIO()
1746 bufio = self.tp(rawio)
1747 self.assertRaises((OverflowError, MemoryError, ValueError),
1748 bufio.__init__, rawio, sys.maxsize)
1749
1750 def test_garbage_collection(self):
1751 CBufferedReaderTest.test_garbage_collection(self)
1752 CBufferedWriterTest.test_garbage_collection(self)
1753
R David Murray67bfe802013-02-23 21:51:05 -05001754 def test_args_error(self):
1755 # Issue #17275
1756 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1757 self.tp(io.BytesIO(), 1024, 1024, 1024)
1758
1759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760class PyBufferedRandomTest(BufferedRandomTest):
1761 tp = pyio.BufferedRandom
1762
1763
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001764# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1765# properties:
1766# - A single output character can correspond to many bytes of input.
1767# - The number of input bytes to complete the character can be
1768# undetermined until the last input byte is received.
1769# - The number of input bytes can vary depending on previous input.
1770# - A single input byte can correspond to many characters of output.
1771# - The number of output characters can be undetermined until the
1772# last input byte is received.
1773# - The number of output characters can vary depending on previous input.
1774
1775class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1776 """
1777 For testing seek/tell behavior with a stateful, buffering decoder.
1778
1779 Input is a sequence of words. Words may be fixed-length (length set
1780 by input) or variable-length (period-terminated). In variable-length
1781 mode, extra periods are ignored. Possible words are:
1782 - 'i' followed by a number sets the input length, I (maximum 99).
1783 When I is set to 0, words are space-terminated.
1784 - 'o' followed by a number sets the output length, O (maximum 99).
1785 - Any other word is converted into a word followed by a period on
1786 the output. The output word consists of the input word truncated
1787 or padded out with hyphens to make its length equal to O. If O
1788 is 0, the word is output verbatim without truncating or padding.
1789 I and O are initially set to 1. When I changes, any buffered input is
1790 re-scanned according to the new I. EOF also terminates the last word.
1791 """
1792
1793 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001794 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001795 self.reset()
1796
1797 def __repr__(self):
1798 return '<SID %x>' % id(self)
1799
1800 def reset(self):
1801 self.i = 1
1802 self.o = 1
1803 self.buffer = bytearray()
1804
1805 def getstate(self):
1806 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1807 return bytes(self.buffer), i*100 + o
1808
1809 def setstate(self, state):
1810 buffer, io = state
1811 self.buffer = bytearray(buffer)
1812 i, o = divmod(io, 100)
1813 self.i, self.o = i ^ 1, o ^ 1
1814
1815 def decode(self, input, final=False):
1816 output = ''
1817 for b in input:
1818 if self.i == 0: # variable-length, terminated with period
1819 if b == ord('.'):
1820 if self.buffer:
1821 output += self.process_word()
1822 else:
1823 self.buffer.append(b)
1824 else: # fixed-length, terminate after self.i bytes
1825 self.buffer.append(b)
1826 if len(self.buffer) == self.i:
1827 output += self.process_word()
1828 if final and self.buffer: # EOF terminates the last word
1829 output += self.process_word()
1830 return output
1831
1832 def process_word(self):
1833 output = ''
1834 if self.buffer[0] == ord('i'):
1835 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1836 elif self.buffer[0] == ord('o'):
1837 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1838 else:
1839 output = self.buffer.decode('ascii')
1840 if len(output) < self.o:
1841 output += '-'*self.o # pad out with hyphens
1842 if self.o:
1843 output = output[:self.o] # truncate to output length
1844 output += '.'
1845 self.buffer = bytearray()
1846 return output
1847
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001848 codecEnabled = False
1849
1850 @classmethod
1851 def lookupTestDecoder(cls, name):
1852 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001853 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001854 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001855 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001856 incrementalencoder=None,
1857 streamreader=None, streamwriter=None,
1858 incrementaldecoder=cls)
1859
1860# Register the previous decoder for testing.
1861# Disabled by default, tests will enable it.
1862codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1863
1864
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001865class StatefulIncrementalDecoderTest(unittest.TestCase):
1866 """
1867 Make sure the StatefulIncrementalDecoder actually works.
1868 """
1869
1870 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001871 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001872 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001873 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001874 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001875 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001876 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001877 # I=0, O=6 (variable-length input, fixed-length output)
1878 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1879 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001880 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001881 # I=6, O=3 (fixed-length input > fixed-length output)
1882 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1883 # I=0, then 3; O=29, then 15 (with longer output)
1884 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1885 'a----------------------------.' +
1886 'b----------------------------.' +
1887 'cde--------------------------.' +
1888 'abcdefghijabcde.' +
1889 'a.b------------.' +
1890 '.c.------------.' +
1891 'd.e------------.' +
1892 'k--------------.' +
1893 'l--------------.' +
1894 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001895 ]
1896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001897 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 # Try a few one-shot test cases.
1899 for input, eof, output in self.test_cases:
1900 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001901 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902
1903 # Also test an unfinished decode, followed by forcing EOF.
1904 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001905 self.assertEqual(d.decode(b'oiabcd'), '')
1906 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001907
1908class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001909
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001910 def setUp(self):
1911 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1912 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001913 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001914
Guido van Rossumd0712812007-04-11 16:32:43 +00001915 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001916 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918 def test_constructor(self):
1919 r = self.BytesIO(b"\xc3\xa9\n\n")
1920 b = self.BufferedReader(r, 1000)
1921 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001922 t.__init__(b, encoding="latin-1", newline="\r\n")
1923 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001924 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001925 t.__init__(b, encoding="utf-8", line_buffering=True)
1926 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001927 self.assertEqual(t.line_buffering, True)
1928 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 self.assertRaises(TypeError, t.__init__, b, newline=42)
1930 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1931
Nick Coghlana9b15242014-02-04 22:11:18 +10001932 def test_non_text_encoding_codecs_are_rejected(self):
1933 # Ensure the constructor complains if passed a codec that isn't
1934 # marked as a text encoding
1935 # http://bugs.python.org/issue20404
1936 r = self.BytesIO()
1937 b = self.BufferedWriter(r)
1938 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
1939 self.TextIOWrapper(b, encoding="hex")
1940
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001941 def test_detach(self):
1942 r = self.BytesIO()
1943 b = self.BufferedWriter(r)
1944 t = self.TextIOWrapper(b)
1945 self.assertIs(t.detach(), b)
1946
1947 t = self.TextIOWrapper(b, encoding="ascii")
1948 t.write("howdy")
1949 self.assertFalse(r.getvalue())
1950 t.detach()
1951 self.assertEqual(r.getvalue(), b"howdy")
1952 self.assertRaises(ValueError, t.detach)
1953
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001954 def test_repr(self):
1955 raw = self.BytesIO("hello".encode("utf-8"))
1956 b = self.BufferedReader(raw)
1957 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001958 modname = self.TextIOWrapper.__module__
1959 self.assertEqual(repr(t),
1960 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1961 raw.name = "dummy"
1962 self.assertEqual(repr(t),
1963 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001964 t.mode = "r"
1965 self.assertEqual(repr(t),
1966 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001967 raw.name = b"dummy"
1968 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001969 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001970
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001971 def test_line_buffering(self):
1972 r = self.BytesIO()
1973 b = self.BufferedWriter(r, 1000)
1974 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001975 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001976 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001977 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001978 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001979 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001980 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001981
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001982 def test_default_encoding(self):
1983 old_environ = dict(os.environ)
1984 try:
1985 # try to get a user preferred encoding different than the current
1986 # locale encoding to check that TextIOWrapper() uses the current
1987 # locale encoding and not the user preferred encoding
1988 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1989 if key in os.environ:
1990 del os.environ[key]
1991
1992 current_locale_encoding = locale.getpreferredencoding(False)
1993 b = self.BytesIO()
1994 t = self.TextIOWrapper(b)
1995 self.assertEqual(t.encoding, current_locale_encoding)
1996 finally:
1997 os.environ.clear()
1998 os.environ.update(old_environ)
1999
Serhiy Storchaka78980432013-01-15 01:12:17 +02002000 # Issue 15989
2001 def test_device_encoding(self):
2002 b = self.BytesIO()
2003 b.fileno = lambda: _testcapi.INT_MAX + 1
2004 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2005 b.fileno = lambda: _testcapi.UINT_MAX + 1
2006 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 def test_encoding(self):
2009 # Check the encoding attribute is always set, and valid
2010 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002011 t = self.TextIOWrapper(b, encoding="utf-8")
2012 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002014 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 codecs.lookup(t.encoding)
2016
2017 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002018 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 b = self.BytesIO(b"abc\n\xff\n")
2020 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002021 self.assertRaises(UnicodeError, t.read)
2022 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002023 b = self.BytesIO(b"abc\n\xff\n")
2024 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002025 self.assertRaises(UnicodeError, t.read)
2026 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 b = self.BytesIO(b"abc\n\xff\n")
2028 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002029 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002030 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 b = self.BytesIO(b"abc\n\xff\n")
2032 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002033 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002035 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002036 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 b = self.BytesIO()
2038 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002039 self.assertRaises(UnicodeError, t.write, "\xff")
2040 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002041 b = self.BytesIO()
2042 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002043 self.assertRaises(UnicodeError, t.write, "\xff")
2044 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002045 b = self.BytesIO()
2046 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002047 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002048 t.write("abc\xffdef\n")
2049 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002050 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002051 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 b = self.BytesIO()
2053 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002054 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002055 t.write("abc\xffdef\n")
2056 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002057 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002060 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2061
2062 tests = [
2063 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002064 [ '', input_lines ],
2065 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2066 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2067 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002068 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002069 encodings = (
2070 'utf-8', 'latin-1',
2071 'utf-16', 'utf-16-le', 'utf-16-be',
2072 'utf-32', 'utf-32-le', 'utf-32-be',
2073 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002074
Guido van Rossum8358db22007-08-18 21:39:55 +00002075 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002076 # character in TextIOWrapper._pending_line.
2077 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002078 # XXX: str.encode() should return bytes
2079 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002080 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002081 for bufsize in range(1, 10):
2082 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2084 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002085 encoding=encoding)
2086 if do_reads:
2087 got_lines = []
2088 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002089 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002090 if c2 == '':
2091 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002092 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002093 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002094 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002095 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002096
2097 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(got_line, exp_line)
2099 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_newlines_input(self):
2102 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002103 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2104 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002105 (None, normalized.decode("ascii").splitlines(keepends=True)),
2106 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002107 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2108 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2109 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002110 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 buf = self.BytesIO(testdata)
2112 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002114 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002115 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117 def test_newlines_output(self):
2118 testdict = {
2119 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2120 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2121 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2122 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2123 }
2124 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2125 for newline, expected in tests:
2126 buf = self.BytesIO()
2127 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2128 txt.write("AAA\nB")
2129 txt.write("BB\nCCC\n")
2130 txt.write("X\rY\r\nZ")
2131 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002132 self.assertEqual(buf.closed, False)
2133 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002134
2135 def test_destructor(self):
2136 l = []
2137 base = self.BytesIO
2138 class MyBytesIO(base):
2139 def close(self):
2140 l.append(self.getvalue())
2141 base.close(self)
2142 b = MyBytesIO()
2143 t = self.TextIOWrapper(b, encoding="ascii")
2144 t.write("abc")
2145 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002146 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002147 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148
2149 def test_override_destructor(self):
2150 record = []
2151 class MyTextIO(self.TextIOWrapper):
2152 def __del__(self):
2153 record.append(1)
2154 try:
2155 f = super().__del__
2156 except AttributeError:
2157 pass
2158 else:
2159 f()
2160 def close(self):
2161 record.append(2)
2162 super().close()
2163 def flush(self):
2164 record.append(3)
2165 super().flush()
2166 b = self.BytesIO()
2167 t = MyTextIO(b, encoding="ascii")
2168 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002169 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 self.assertEqual(record, [1, 2, 3])
2171
2172 def test_error_through_destructor(self):
2173 # Test that the exception state is not modified by a destructor,
2174 # even if close() fails.
2175 rawio = self.CloseFailureIO()
2176 def f():
2177 self.TextIOWrapper(rawio).xyzzy
2178 with support.captured_output("stderr") as s:
2179 self.assertRaises(AttributeError, f)
2180 s = s.getvalue().strip()
2181 if s:
2182 # The destructor *may* have printed an unraisable error, check it
2183 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002184 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002185 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002186
Guido van Rossum9b76da62007-04-11 01:09:03 +00002187 # Systematic tests of the text I/O API
2188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002189 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002190 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002191 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002192 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002193 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002194 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002195 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002197 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(f.tell(), 0)
2199 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002200 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002201 self.assertEqual(f.seek(0), 0)
2202 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002203 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(f.read(2), "ab")
2205 self.assertEqual(f.read(1), "c")
2206 self.assertEqual(f.read(1), "")
2207 self.assertEqual(f.read(), "")
2208 self.assertEqual(f.tell(), cookie)
2209 self.assertEqual(f.seek(0), 0)
2210 self.assertEqual(f.seek(0, 2), cookie)
2211 self.assertEqual(f.write("def"), 3)
2212 self.assertEqual(f.seek(cookie), cookie)
2213 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002214 if enc.startswith("utf"):
2215 self.multi_line_test(f, enc)
2216 f.close()
2217
2218 def multi_line_test(self, f, enc):
2219 f.seek(0)
2220 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002221 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002222 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002223 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002224 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002225 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002226 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002227 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002228 wlines.append((f.tell(), line))
2229 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002230 f.seek(0)
2231 rlines = []
2232 while True:
2233 pos = f.tell()
2234 line = f.readline()
2235 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002236 break
2237 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002241 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002242 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002243 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002244 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002245 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002246 p2 = f.tell()
2247 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002248 self.assertEqual(f.tell(), p0)
2249 self.assertEqual(f.readline(), "\xff\n")
2250 self.assertEqual(f.tell(), p1)
2251 self.assertEqual(f.readline(), "\xff\n")
2252 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002253 f.seek(0)
2254 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002255 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002256 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002257 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002258 f.close()
2259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 def test_seeking(self):
2261 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002262 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002263 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002264 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002266 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002267 suffix = bytes(u_suffix.encode("utf-8"))
2268 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002269 with self.open(support.TESTFN, "wb") as f:
2270 f.write(line*2)
2271 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2272 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002273 self.assertEqual(s, str(prefix, "ascii"))
2274 self.assertEqual(f.tell(), prefix_size)
2275 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002278 # Regression test for a specific bug
2279 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002280 with self.open(support.TESTFN, "wb") as f:
2281 f.write(data)
2282 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2283 f._CHUNK_SIZE # Just test that it exists
2284 f._CHUNK_SIZE = 2
2285 f.readline()
2286 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 def test_seek_and_tell(self):
2289 #Test seek/tell using the StatefulIncrementalDecoder.
2290 # Make test faster by doing smaller seeks
2291 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002292
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002293 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002294 """Tell/seek to various points within a data stream and ensure
2295 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002296 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002297 f.write(data)
2298 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 f = self.open(support.TESTFN, encoding='test_decoder')
2300 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002301 decoded = f.read()
2302 f.close()
2303
Neal Norwitze2b07052008-03-18 19:52:05 +00002304 for i in range(min_pos, len(decoded) + 1): # seek positions
2305 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002306 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002307 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002308 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002309 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002310 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002311 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002312 f.close()
2313
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002314 # Enable the test decoder.
2315 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002316
2317 # Run the tests.
2318 try:
2319 # Try each test case.
2320 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002321 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002322
2323 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002324 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2325 offset = CHUNK_SIZE - len(input)//2
2326 prefix = b'.'*offset
2327 # Don't bother seeking into the prefix (takes too long).
2328 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002329 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002330
2331 # Ensure our test decoder won't interfere with subsequent tests.
2332 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002333 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002334
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002336 data = "1234567890"
2337 tests = ("utf-16",
2338 "utf-16-le",
2339 "utf-16-be",
2340 "utf-32",
2341 "utf-32-le",
2342 "utf-32-be")
2343 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002344 buf = self.BytesIO()
2345 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002346 # Check if the BOM is written only once (see issue1753).
2347 f.write(data)
2348 f.write(data)
2349 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002351 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002352 self.assertEqual(f.read(), data * 2)
2353 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002354
Benjamin Petersona1b49012009-03-31 23:11:32 +00002355 def test_unreadable(self):
2356 class UnReadable(self.BytesIO):
2357 def readable(self):
2358 return False
2359 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002360 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002361
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002362 def test_read_one_by_one(self):
2363 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002364 reads = ""
2365 while True:
2366 c = txt.read(1)
2367 if not c:
2368 break
2369 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002370 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002371
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002372 def test_readlines(self):
2373 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2374 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2375 txt.seek(0)
2376 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2377 txt.seek(0)
2378 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2379
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002380 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002382 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002384 reads = ""
2385 while True:
2386 c = txt.read(128)
2387 if not c:
2388 break
2389 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002391
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002392 def test_writelines(self):
2393 l = ['ab', 'cd', 'ef']
2394 buf = self.BytesIO()
2395 txt = self.TextIOWrapper(buf)
2396 txt.writelines(l)
2397 txt.flush()
2398 self.assertEqual(buf.getvalue(), b'abcdef')
2399
2400 def test_writelines_userlist(self):
2401 l = UserList(['ab', 'cd', 'ef'])
2402 buf = self.BytesIO()
2403 txt = self.TextIOWrapper(buf)
2404 txt.writelines(l)
2405 txt.flush()
2406 self.assertEqual(buf.getvalue(), b'abcdef')
2407
2408 def test_writelines_error(self):
2409 txt = self.TextIOWrapper(self.BytesIO())
2410 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2411 self.assertRaises(TypeError, txt.writelines, None)
2412 self.assertRaises(TypeError, txt.writelines, b'abc')
2413
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002414 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002416
2417 # read one char at a time
2418 reads = ""
2419 while True:
2420 c = txt.read(1)
2421 if not c:
2422 break
2423 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002424 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002425
2426 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002428 txt._CHUNK_SIZE = 4
2429
2430 reads = ""
2431 while True:
2432 c = txt.read(4)
2433 if not c:
2434 break
2435 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002436 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002437
2438 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002440 txt._CHUNK_SIZE = 4
2441
2442 reads = txt.read(4)
2443 reads += txt.read(4)
2444 reads += txt.readline()
2445 reads += txt.readline()
2446 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002448
2449 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002451 txt._CHUNK_SIZE = 4
2452
2453 reads = txt.read(4)
2454 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002455 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002456
2457 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002458 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002459 txt._CHUNK_SIZE = 4
2460
2461 reads = txt.read(4)
2462 pos = txt.tell()
2463 txt.seek(0)
2464 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002465 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002466
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002467 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468 buffer = self.BytesIO(self.testdata)
2469 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002470
2471 self.assertEqual(buffer.seekable(), txt.seekable())
2472
Antoine Pitroue4501852009-05-14 18:55:55 +00002473 def test_append_bom(self):
2474 # The BOM is not written again when appending to a non-empty file
2475 filename = support.TESTFN
2476 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2477 with self.open(filename, 'w', encoding=charset) as f:
2478 f.write('aaa')
2479 pos = f.tell()
2480 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002481 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002482
2483 with self.open(filename, 'a', encoding=charset) as f:
2484 f.write('xxx')
2485 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002486 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002487
2488 def test_seek_bom(self):
2489 # Same test, but when seeking manually
2490 filename = support.TESTFN
2491 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2492 with self.open(filename, 'w', encoding=charset) as f:
2493 f.write('aaa')
2494 pos = f.tell()
2495 with self.open(filename, 'r+', encoding=charset) as f:
2496 f.seek(pos)
2497 f.write('zzz')
2498 f.seek(0)
2499 f.write('bbb')
2500 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002502
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002503 def test_errors_property(self):
2504 with self.open(support.TESTFN, "w") as f:
2505 self.assertEqual(f.errors, "strict")
2506 with self.open(support.TESTFN, "w", errors="replace") as f:
2507 self.assertEqual(f.errors, "replace")
2508
Brett Cannon31f59292011-02-21 19:29:56 +00002509 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002510 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002511 def test_threads_write(self):
2512 # Issue6750: concurrent writes could duplicate data
2513 event = threading.Event()
2514 with self.open(support.TESTFN, "w", buffering=1) as f:
2515 def run(n):
2516 text = "Thread%03d\n" % n
2517 event.wait()
2518 f.write(text)
2519 threads = [threading.Thread(target=lambda n=x: run(n))
2520 for x in range(20)]
2521 for t in threads:
2522 t.start()
2523 time.sleep(0.02)
2524 event.set()
2525 for t in threads:
2526 t.join()
2527 with self.open(support.TESTFN) as f:
2528 content = f.read()
2529 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002530 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002531
Antoine Pitrou6be88762010-05-03 16:48:20 +00002532 def test_flush_error_on_close(self):
2533 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2534 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002535 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002536 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002537 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002538 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002539
2540 def test_multi_close(self):
2541 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2542 txt.close()
2543 txt.close()
2544 txt.close()
2545 self.assertRaises(ValueError, txt.flush)
2546
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002547 def test_unseekable(self):
2548 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2549 self.assertRaises(self.UnsupportedOperation, txt.tell)
2550 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2551
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002552 def test_readonly_attributes(self):
2553 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2554 buf = self.BytesIO(self.testdata)
2555 with self.assertRaises(AttributeError):
2556 txt.buffer = buf
2557
Antoine Pitroue96ec682011-07-23 21:46:35 +02002558 def test_rawio(self):
2559 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2560 # that subprocess.Popen() can have the required unbuffered
2561 # semantics with universal_newlines=True.
2562 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2563 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2564 # Reads
2565 self.assertEqual(txt.read(4), 'abcd')
2566 self.assertEqual(txt.readline(), 'efghi\n')
2567 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2568
2569 def test_rawio_write_through(self):
2570 # Issue #12591: with write_through=True, writes don't need a flush
2571 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2572 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2573 write_through=True)
2574 txt.write('1')
2575 txt.write('23\n4')
2576 txt.write('5')
2577 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2578
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002579 def test_read_nonbytes(self):
2580 # Issue #17106
2581 # Crash when underlying read() returns non-bytes
2582 t = self.TextIOWrapper(self.StringIO('a'))
2583 self.assertRaises(TypeError, t.read, 1)
2584 t = self.TextIOWrapper(self.StringIO('a'))
2585 self.assertRaises(TypeError, t.readline)
2586 t = self.TextIOWrapper(self.StringIO('a'))
2587 self.assertRaises(TypeError, t.read)
2588
2589 def test_illegal_decoder(self):
2590 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002591 # Bypass the early encoding check added in issue 20404
2592 def _make_illegal_wrapper():
2593 quopri = codecs.lookup("quopri")
2594 quopri._is_text_encoding = True
2595 try:
2596 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2597 newline='\n', encoding="quopri")
2598 finally:
2599 quopri._is_text_encoding = False
2600 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002601 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002602 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002603 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002604 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002605 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002606 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002607 self.assertRaises(TypeError, t.read)
2608
Antoine Pitrou712cb732013-12-21 15:51:54 +01002609 def _check_create_at_shutdown(self, **kwargs):
2610 # Issue #20037: creating a TextIOWrapper at shutdown
2611 # shouldn't crash the interpreter.
2612 iomod = self.io.__name__
2613 code = """if 1:
2614 import codecs
2615 import {iomod} as io
2616
2617 # Avoid looking up codecs at shutdown
2618 codecs.lookup('utf-8')
2619
2620 class C:
2621 def __init__(self):
2622 self.buf = io.BytesIO()
2623 def __del__(self):
2624 io.TextIOWrapper(self.buf, **{kwargs})
2625 print("ok")
2626 c = C()
2627 """.format(iomod=iomod, kwargs=kwargs)
2628 return assert_python_ok("-c", code)
2629
2630 def test_create_at_shutdown_without_encoding(self):
2631 rc, out, err = self._check_create_at_shutdown()
2632 if err:
2633 # Can error out with a RuntimeError if the module state
2634 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002635 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002636 else:
2637 self.assertEqual("ok", out.decode().strip())
2638
2639 def test_create_at_shutdown_with_encoding(self):
2640 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2641 errors='strict')
2642 self.assertFalse(err)
2643 self.assertEqual("ok", out.decode().strip())
2644
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002646class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002647 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002648 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002649
2650 def test_initialization(self):
2651 r = self.BytesIO(b"\xc3\xa9\n\n")
2652 b = self.BufferedReader(r, 1000)
2653 t = self.TextIOWrapper(b)
2654 self.assertRaises(TypeError, t.__init__, b, newline=42)
2655 self.assertRaises(ValueError, t.read)
2656 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2657 self.assertRaises(ValueError, t.read)
2658
2659 def test_garbage_collection(self):
2660 # C TextIOWrapper objects are collected, and collecting them flushes
2661 # all data to disk.
2662 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002663 with support.check_warnings(('', ResourceWarning)):
2664 rawio = io.FileIO(support.TESTFN, "wb")
2665 b = self.BufferedWriter(rawio)
2666 t = self.TextIOWrapper(b, encoding="ascii")
2667 t.write("456def")
2668 t.x = t
2669 wr = weakref.ref(t)
2670 del t
2671 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002672 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002673 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 self.assertEqual(f.read(), b"456def")
2675
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002676 def test_rwpair_cleared_before_textio(self):
2677 # Issue 13070: TextIOWrapper's finalization would crash when called
2678 # after the reference to the underlying BufferedRWPair's writer got
2679 # cleared by the GC.
2680 for i in range(1000):
2681 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2682 t1 = self.TextIOWrapper(b1, encoding="ascii")
2683 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2684 t2 = self.TextIOWrapper(b2, encoding="ascii")
2685 # circular references
2686 t1.buddy = t2
2687 t2.buddy = t1
2688 support.gc_collect()
2689
2690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002691class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002692 io = pyio
Nick Coghlana9b15242014-02-04 22:11:18 +10002693 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694
2695
2696class IncrementalNewlineDecoderTest(unittest.TestCase):
2697
2698 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002699 # UTF-8 specific tests for a newline decoder
2700 def _check_decode(b, s, **kwargs):
2701 # We exercise getstate() / setstate() as well as decode()
2702 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002704 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002705 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002706
Antoine Pitrou180a3362008-12-14 16:36:46 +00002707 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002708
Antoine Pitrou180a3362008-12-14 16:36:46 +00002709 _check_decode(b'\xe8', "")
2710 _check_decode(b'\xa2', "")
2711 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002712
Antoine Pitrou180a3362008-12-14 16:36:46 +00002713 _check_decode(b'\xe8', "")
2714 _check_decode(b'\xa2', "")
2715 _check_decode(b'\x88', "\u8888")
2716
2717 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002718 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2719
Antoine Pitrou180a3362008-12-14 16:36:46 +00002720 decoder.reset()
2721 _check_decode(b'\n', "\n")
2722 _check_decode(b'\r', "")
2723 _check_decode(b'', "\n", final=True)
2724 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002725
Antoine Pitrou180a3362008-12-14 16:36:46 +00002726 _check_decode(b'\r', "")
2727 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002728
Antoine Pitrou180a3362008-12-14 16:36:46 +00002729 _check_decode(b'\r\r\n', "\n\n")
2730 _check_decode(b'\r', "")
2731 _check_decode(b'\r', "\n")
2732 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002733
Antoine Pitrou180a3362008-12-14 16:36:46 +00002734 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2735 _check_decode(b'\xe8\xa2\x88', "\u8888")
2736 _check_decode(b'\n', "\n")
2737 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2738 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002741 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 if encoding is not None:
2743 encoder = codecs.getincrementalencoder(encoding)()
2744 def _decode_bytewise(s):
2745 # Decode one byte at a time
2746 for b in encoder.encode(s):
2747 result.append(decoder.decode(bytes([b])))
2748 else:
2749 encoder = None
2750 def _decode_bytewise(s):
2751 # Decode one char at a time
2752 for c in s:
2753 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002754 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002755 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002756 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002757 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002758 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002759 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002760 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002761 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002763 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002765 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 input = "abc"
2767 if encoder is not None:
2768 encoder.reset()
2769 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002770 self.assertEqual(decoder.decode(input), "abc")
2771 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002772
2773 def test_newline_decoder(self):
2774 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 # None meaning the IncrementalNewlineDecoder takes unicode input
2776 # rather than bytes input
2777 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002778 'utf-16', 'utf-16-le', 'utf-16-be',
2779 'utf-32', 'utf-32-le', 'utf-32-be',
2780 )
2781 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782 decoder = enc and codecs.getincrementaldecoder(enc)()
2783 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2784 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002785 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2787 self.check_newline_decoding_utf8(decoder)
2788
Antoine Pitrou66913e22009-03-06 23:40:56 +00002789 def test_newline_bytes(self):
2790 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2791 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002792 self.assertEqual(dec.newlines, None)
2793 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2794 self.assertEqual(dec.newlines, None)
2795 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2796 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002797 dec = self.IncrementalNewlineDecoder(None, translate=False)
2798 _check(dec)
2799 dec = self.IncrementalNewlineDecoder(None, translate=True)
2800 _check(dec)
2801
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002802class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2803 pass
2804
2805class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2806 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002807
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002808
Guido van Rossum01a27522007-03-07 01:00:12 +00002809# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002810
Guido van Rossum5abbf752007-08-27 17:39:33 +00002811class MiscIOTest(unittest.TestCase):
2812
Barry Warsaw40e82462008-11-20 20:14:50 +00002813 def tearDown(self):
2814 support.unlink(support.TESTFN)
2815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002816 def test___all__(self):
2817 for name in self.io.__all__:
2818 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002819 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002820 if name == "open":
2821 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002822 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002823 self.assertTrue(issubclass(obj, Exception), name)
2824 elif not name.startswith("SEEK_"):
2825 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002826
Barry Warsaw40e82462008-11-20 20:14:50 +00002827 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002829 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002830 f.close()
2831
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002832 with support.check_warnings(('', DeprecationWarning)):
2833 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002834 self.assertEqual(f.name, support.TESTFN)
2835 self.assertEqual(f.buffer.name, support.TESTFN)
2836 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2837 self.assertEqual(f.mode, "U")
2838 self.assertEqual(f.buffer.mode, "rb")
2839 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002840 f.close()
2841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002843 self.assertEqual(f.mode, "w+")
2844 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2845 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002847 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002848 self.assertEqual(g.mode, "wb")
2849 self.assertEqual(g.raw.mode, "wb")
2850 self.assertEqual(g.name, f.fileno())
2851 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002852 f.close()
2853 g.close()
2854
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002855 def test_io_after_close(self):
2856 for kwargs in [
2857 {"mode": "w"},
2858 {"mode": "wb"},
2859 {"mode": "w", "buffering": 1},
2860 {"mode": "w", "buffering": 2},
2861 {"mode": "wb", "buffering": 0},
2862 {"mode": "r"},
2863 {"mode": "rb"},
2864 {"mode": "r", "buffering": 1},
2865 {"mode": "r", "buffering": 2},
2866 {"mode": "rb", "buffering": 0},
2867 {"mode": "w+"},
2868 {"mode": "w+b"},
2869 {"mode": "w+", "buffering": 1},
2870 {"mode": "w+", "buffering": 2},
2871 {"mode": "w+b", "buffering": 0},
2872 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002873 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002874 f.close()
2875 self.assertRaises(ValueError, f.flush)
2876 self.assertRaises(ValueError, f.fileno)
2877 self.assertRaises(ValueError, f.isatty)
2878 self.assertRaises(ValueError, f.__iter__)
2879 if hasattr(f, "peek"):
2880 self.assertRaises(ValueError, f.peek, 1)
2881 self.assertRaises(ValueError, f.read)
2882 if hasattr(f, "read1"):
2883 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002884 if hasattr(f, "readall"):
2885 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002886 if hasattr(f, "readinto"):
2887 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2888 self.assertRaises(ValueError, f.readline)
2889 self.assertRaises(ValueError, f.readlines)
2890 self.assertRaises(ValueError, f.seek, 0)
2891 self.assertRaises(ValueError, f.tell)
2892 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893 self.assertRaises(ValueError, f.write,
2894 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002895 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002896 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002898 def test_blockingioerror(self):
2899 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002900 class C(str):
2901 pass
2902 c = C("")
2903 b = self.BlockingIOError(1, c)
2904 c.b = b
2905 b.c = c
2906 wr = weakref.ref(c)
2907 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002908 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002909 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002910
2911 def test_abcs(self):
2912 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002913 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2914 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2915 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2916 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002917
2918 def _check_abc_inheritance(self, abcmodule):
2919 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002920 self.assertIsInstance(f, abcmodule.IOBase)
2921 self.assertIsInstance(f, abcmodule.RawIOBase)
2922 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2923 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002924 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002925 self.assertIsInstance(f, abcmodule.IOBase)
2926 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2927 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2928 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002929 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002930 self.assertIsInstance(f, abcmodule.IOBase)
2931 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2932 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2933 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002934
2935 def test_abc_inheritance(self):
2936 # Test implementations inherit from their respective ABCs
2937 self._check_abc_inheritance(self)
2938
2939 def test_abc_inheritance_official(self):
2940 # Test implementations inherit from the official ABCs of the
2941 # baseline "io" module.
2942 self._check_abc_inheritance(io)
2943
Antoine Pitroue033e062010-10-29 10:38:18 +00002944 def _check_warn_on_dealloc(self, *args, **kwargs):
2945 f = open(*args, **kwargs)
2946 r = repr(f)
2947 with self.assertWarns(ResourceWarning) as cm:
2948 f = None
2949 support.gc_collect()
2950 self.assertIn(r, str(cm.warning.args[0]))
2951
2952 def test_warn_on_dealloc(self):
2953 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2954 self._check_warn_on_dealloc(support.TESTFN, "wb")
2955 self._check_warn_on_dealloc(support.TESTFN, "w")
2956
2957 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2958 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002959 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002960 for fd in fds:
2961 try:
2962 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002963 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002964 if e.errno != errno.EBADF:
2965 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002966 self.addCleanup(cleanup_fds)
2967 r, w = os.pipe()
2968 fds += r, w
2969 self._check_warn_on_dealloc(r, *args, **kwargs)
2970 # When using closefd=False, there's no warning
2971 r, w = os.pipe()
2972 fds += r, w
2973 with warnings.catch_warnings(record=True) as recorded:
2974 open(r, *args, closefd=False, **kwargs)
2975 support.gc_collect()
2976 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002977
2978 def test_warn_on_dealloc_fd(self):
2979 self._check_warn_on_dealloc_fd("rb", buffering=0)
2980 self._check_warn_on_dealloc_fd("rb")
2981 self._check_warn_on_dealloc_fd("r")
2982
2983
Antoine Pitrou243757e2010-11-05 21:15:39 +00002984 def test_pickling(self):
2985 # Pickling file objects is forbidden
2986 for kwargs in [
2987 {"mode": "w"},
2988 {"mode": "wb"},
2989 {"mode": "wb", "buffering": 0},
2990 {"mode": "r"},
2991 {"mode": "rb"},
2992 {"mode": "rb", "buffering": 0},
2993 {"mode": "w+"},
2994 {"mode": "w+b"},
2995 {"mode": "w+b", "buffering": 0},
2996 ]:
2997 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2998 with self.open(support.TESTFN, **kwargs) as f:
2999 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3000
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003001 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3002 def test_nonblock_pipe_write_bigbuf(self):
3003 self._test_nonblock_pipe_write(16*1024)
3004
3005 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3006 def test_nonblock_pipe_write_smallbuf(self):
3007 self._test_nonblock_pipe_write(1024)
3008
3009 def _set_non_blocking(self, fd):
3010 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3011 self.assertNotEqual(flags, -1)
3012 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3013 self.assertEqual(res, 0)
3014
3015 def _test_nonblock_pipe_write(self, bufsize):
3016 sent = []
3017 received = []
3018 r, w = os.pipe()
3019 self._set_non_blocking(r)
3020 self._set_non_blocking(w)
3021
3022 # To exercise all code paths in the C implementation we need
3023 # to play with buffer sizes. For instance, if we choose a
3024 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3025 # then we will never get a partial write of the buffer.
3026 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3027 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3028
3029 with rf, wf:
3030 for N in 9999, 73, 7574:
3031 try:
3032 i = 0
3033 while True:
3034 msg = bytes([i % 26 + 97]) * N
3035 sent.append(msg)
3036 wf.write(msg)
3037 i += 1
3038
3039 except self.BlockingIOError as e:
3040 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003041 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003042 sent[-1] = sent[-1][:e.characters_written]
3043 received.append(rf.read())
3044 msg = b'BLOCKED'
3045 wf.write(msg)
3046 sent.append(msg)
3047
3048 while True:
3049 try:
3050 wf.flush()
3051 break
3052 except self.BlockingIOError as e:
3053 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003054 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003055 self.assertEqual(e.characters_written, 0)
3056 received.append(rf.read())
3057
3058 received += iter(rf.read, None)
3059
3060 sent, received = b''.join(sent), b''.join(received)
3061 self.assertTrue(sent == received)
3062 self.assertTrue(wf.closed)
3063 self.assertTrue(rf.closed)
3064
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003065 def test_create_fail(self):
3066 # 'x' mode fails if file is existing
3067 with self.open(support.TESTFN, 'w'):
3068 pass
3069 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3070
3071 def test_create_writes(self):
3072 # 'x' mode opens for writing
3073 with self.open(support.TESTFN, 'xb') as f:
3074 f.write(b"spam")
3075 with self.open(support.TESTFN, 'rb') as f:
3076 self.assertEqual(b"spam", f.read())
3077
Christian Heimes7b648752012-09-10 14:48:43 +02003078 def test_open_allargs(self):
3079 # there used to be a buffer overflow in the parser for rawmode
3080 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3081
3082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003083class CMiscIOTest(MiscIOTest):
3084 io = io
3085
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003086 def test_readinto_buffer_overflow(self):
3087 # Issue #18025
3088 class BadReader(self.io.BufferedIOBase):
3089 def read(self, n=-1):
3090 return b'x' * 10**6
3091 bufio = BadReader()
3092 b = bytearray(2)
3093 self.assertRaises(ValueError, bufio.readinto, b)
3094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003095class PyMiscIOTest(MiscIOTest):
3096 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003097
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003098
3099@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3100class SignalsTest(unittest.TestCase):
3101
3102 def setUp(self):
3103 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3104
3105 def tearDown(self):
3106 signal.signal(signal.SIGALRM, self.oldalrm)
3107
3108 def alarm_interrupt(self, sig, frame):
3109 1/0
3110
3111 @unittest.skipUnless(threading, 'Threading required for this test.')
3112 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3113 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003114 invokes the signal handler, and bubbles up the exception raised
3115 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003116 read_results = []
3117 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003118 if hasattr(signal, 'pthread_sigmask'):
3119 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003120 s = os.read(r, 1)
3121 read_results.append(s)
3122 t = threading.Thread(target=_read)
3123 t.daemon = True
3124 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003125 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003126 try:
3127 wio = self.io.open(w, **fdopen_kwargs)
3128 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003129 # Fill the pipe enough that the write will be blocking.
3130 # It will be interrupted by the timer armed above. Since the
3131 # other thread has read one byte, the low-level write will
3132 # return with a successful (partial) result rather than an EINTR.
3133 # The buffered IO layer must check for pending signal
3134 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003135 signal.alarm(1)
3136 try:
3137 self.assertRaises(ZeroDivisionError,
3138 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3139 finally:
3140 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003141 t.join()
3142 # We got one byte, get another one and check that it isn't a
3143 # repeat of the first one.
3144 read_results.append(os.read(r, 1))
3145 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3146 finally:
3147 os.close(w)
3148 os.close(r)
3149 # This is deliberate. If we didn't close the file descriptor
3150 # before closing wio, wio would try to flush its internal
3151 # buffer, and block again.
3152 try:
3153 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003154 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003155 if e.errno != errno.EBADF:
3156 raise
3157
3158 def test_interrupted_write_unbuffered(self):
3159 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3160
3161 def test_interrupted_write_buffered(self):
3162 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3163
3164 def test_interrupted_write_text(self):
3165 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3166
Brett Cannon31f59292011-02-21 19:29:56 +00003167 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003168 def check_reentrant_write(self, data, **fdopen_kwargs):
3169 def on_alarm(*args):
3170 # Will be called reentrantly from the same thread
3171 wio.write(data)
3172 1/0
3173 signal.signal(signal.SIGALRM, on_alarm)
3174 r, w = os.pipe()
3175 wio = self.io.open(w, **fdopen_kwargs)
3176 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003177 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003178 # Either the reentrant call to wio.write() fails with RuntimeError,
3179 # or the signal handler raises ZeroDivisionError.
3180 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3181 while 1:
3182 for i in range(100):
3183 wio.write(data)
3184 wio.flush()
3185 # Make sure the buffer doesn't fill up and block further writes
3186 os.read(r, len(data) * 100)
3187 exc = cm.exception
3188 if isinstance(exc, RuntimeError):
3189 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3190 finally:
3191 wio.close()
3192 os.close(r)
3193
3194 def test_reentrant_write_buffered(self):
3195 self.check_reentrant_write(b"xy", mode="wb")
3196
3197 def test_reentrant_write_text(self):
3198 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3199
Antoine Pitrou707ce822011-02-25 21:24:11 +00003200 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3201 """Check that a buffered read, when it gets interrupted (either
3202 returning a partial result or EINTR), properly invokes the signal
3203 handler and retries if the latter returned successfully."""
3204 r, w = os.pipe()
3205 fdopen_kwargs["closefd"] = False
3206 def alarm_handler(sig, frame):
3207 os.write(w, b"bar")
3208 signal.signal(signal.SIGALRM, alarm_handler)
3209 try:
3210 rio = self.io.open(r, **fdopen_kwargs)
3211 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003212 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003213 # Expected behaviour:
3214 # - first raw read() returns partial b"foo"
3215 # - second raw read() returns EINTR
3216 # - third raw read() returns b"bar"
3217 self.assertEqual(decode(rio.read(6)), "foobar")
3218 finally:
3219 rio.close()
3220 os.close(w)
3221 os.close(r)
3222
Antoine Pitrou20db5112011-08-19 20:32:34 +02003223 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003224 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3225 mode="rb")
3226
Antoine Pitrou20db5112011-08-19 20:32:34 +02003227 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003228 self.check_interrupted_read_retry(lambda x: x,
3229 mode="r")
3230
3231 @unittest.skipUnless(threading, 'Threading required for this test.')
3232 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3233 """Check that a buffered write, when it gets interrupted (either
3234 returning a partial result or EINTR), properly invokes the signal
3235 handler and retries if the latter returned successfully."""
3236 select = support.import_module("select")
3237 # A quantity that exceeds the buffer size of an anonymous pipe's
3238 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003239 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003240 r, w = os.pipe()
3241 fdopen_kwargs["closefd"] = False
3242 # We need a separate thread to read from the pipe and allow the
3243 # write() to finish. This thread is started after the SIGALRM is
3244 # received (forcing a first EINTR in write()).
3245 read_results = []
3246 write_finished = False
3247 def _read():
3248 while not write_finished:
3249 while r in select.select([r], [], [], 1.0)[0]:
3250 s = os.read(r, 1024)
3251 read_results.append(s)
3252 t = threading.Thread(target=_read)
3253 t.daemon = True
3254 def alarm1(sig, frame):
3255 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003256 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003257 def alarm2(sig, frame):
3258 t.start()
3259 signal.signal(signal.SIGALRM, alarm1)
3260 try:
3261 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003262 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003263 # Expected behaviour:
3264 # - first raw write() is partial (because of the limited pipe buffer
3265 # and the first alarm)
3266 # - second raw write() returns EINTR (because of the second alarm)
3267 # - subsequent write()s are successful (either partial or complete)
3268 self.assertEqual(N, wio.write(item * N))
3269 wio.flush()
3270 write_finished = True
3271 t.join()
3272 self.assertEqual(N, sum(len(x) for x in read_results))
3273 finally:
3274 write_finished = True
3275 os.close(w)
3276 os.close(r)
3277 # This is deliberate. If we didn't close the file descriptor
3278 # before closing wio, wio would try to flush its internal
3279 # buffer, and could block (in case of failure).
3280 try:
3281 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003282 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003283 if e.errno != errno.EBADF:
3284 raise
3285
Antoine Pitrou20db5112011-08-19 20:32:34 +02003286 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003287 self.check_interrupted_write_retry(b"x", mode="wb")
3288
Antoine Pitrou20db5112011-08-19 20:32:34 +02003289 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003290 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3291
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003292
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003293class CSignalsTest(SignalsTest):
3294 io = io
3295
3296class PySignalsTest(SignalsTest):
3297 io = pyio
3298
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003299 # Handling reentrancy issues would slow down _pyio even more, so the
3300 # tests are disabled.
3301 test_reentrant_write_buffered = None
3302 test_reentrant_write_text = None
3303
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003304
Ezio Melottidaa42c72013-03-23 16:30:16 +02003305def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003306 tests = (CIOTest, PyIOTest,
3307 CBufferedReaderTest, PyBufferedReaderTest,
3308 CBufferedWriterTest, PyBufferedWriterTest,
3309 CBufferedRWPairTest, PyBufferedRWPairTest,
3310 CBufferedRandomTest, PyBufferedRandomTest,
3311 StatefulIncrementalDecoderTest,
3312 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3313 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003314 CMiscIOTest, PyMiscIOTest,
3315 CSignalsTest, PySignalsTest,
3316 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003317
3318 # Put the namespaces of the IO module we are testing and some useful mock
3319 # classes in the __dict__ of each test.
3320 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003321 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003322 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3323 c_io_ns = {name : getattr(io, name) for name in all_members}
3324 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3325 globs = globals()
3326 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3327 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3328 # Avoid turning open into a bound method.
3329 py_io_ns["open"] = pyio.OpenWrapper
3330 for test in tests:
3331 if test.__name__.startswith("C"):
3332 for name, obj in c_io_ns.items():
3333 setattr(test, name, obj)
3334 elif test.__name__.startswith("Py"):
3335 for name, obj in py_io_ns.items():
3336 setattr(test, name, obj)
3337
Ezio Melottidaa42c72013-03-23 16:30:16 +02003338 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3339 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003340
3341if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003342 unittest.main()