blob: 4d178216bc14d63763157f9b52a8205ffe2b62d3 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou25f85d42015-04-13 19:41:47 +020038from test.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000452 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000572 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200592 def check_flush_error_on_close(self, *args, **kwargs):
593 # Test that the file is closed despite failed flush
594 # and that flush() is called before file closed.
595 f = self.open(*args, **kwargs)
596 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200598 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200603 self.assertTrue(closed) # flush() called
604 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200605 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200606
607 def test_flush_error_on_close(self):
608 # raw file
609 # Issue #5700: io.FileIO calls flush() after file closed
610 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
611 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
612 self.check_flush_error_on_close(fd, 'wb', buffering=0)
613 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
614 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
615 os.close(fd)
616 # buffered io
617 self.check_flush_error_on_close(support.TESTFN, 'wb')
618 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
619 self.check_flush_error_on_close(fd, 'wb')
620 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
621 self.check_flush_error_on_close(fd, 'wb', closefd=False)
622 os.close(fd)
623 # text io
624 self.check_flush_error_on_close(support.TESTFN, 'w')
625 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
626 self.check_flush_error_on_close(fd, 'w')
627 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
628 self.check_flush_error_on_close(fd, 'w', closefd=False)
629 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000630
631 def test_multi_close(self):
632 f = self.open(support.TESTFN, "wb", buffering=0)
633 f.close()
634 f.close()
635 f.close()
636 self.assertRaises(ValueError, f.flush)
637
Antoine Pitrou328ec742010-09-14 18:37:24 +0000638 def test_RawIOBase_read(self):
639 # Exercise the default RawIOBase.read() implementation (which calls
640 # readinto() internally).
641 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
642 self.assertEqual(rawio.read(2), b"ab")
643 self.assertEqual(rawio.read(2), b"c")
644 self.assertEqual(rawio.read(2), b"d")
645 self.assertEqual(rawio.read(2), None)
646 self.assertEqual(rawio.read(2), b"ef")
647 self.assertEqual(rawio.read(2), b"g")
648 self.assertEqual(rawio.read(2), None)
649 self.assertEqual(rawio.read(2), b"")
650
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400651 def test_types_have_dict(self):
652 test = (
653 self.IOBase(),
654 self.RawIOBase(),
655 self.TextIOBase(),
656 self.StringIO(),
657 self.BytesIO()
658 )
659 for obj in test:
660 self.assertTrue(hasattr(obj, "__dict__"))
661
Ross Lagerwall59142db2011-10-31 20:34:46 +0200662 def test_opener(self):
663 with self.open(support.TESTFN, "w") as f:
664 f.write("egg\n")
665 fd = os.open(support.TESTFN, os.O_RDONLY)
666 def opener(path, flags):
667 return fd
668 with self.open("non-existent", "r", opener=opener) as f:
669 self.assertEqual(f.read(), "egg\n")
670
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200671 def test_fileio_closefd(self):
672 # Issue #4841
673 with self.open(__file__, 'rb') as f1, \
674 self.open(__file__, 'rb') as f2:
675 fileio = self.FileIO(f1.fileno(), closefd=False)
676 # .__init__() must not close f1
677 fileio.__init__(f2.fileno(), closefd=False)
678 f1.readline()
679 # .close() must not close f2
680 fileio.close()
681 f2.readline()
682
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300683 def test_nonbuffered_textio(self):
684 with warnings.catch_warnings(record=True) as recorded:
685 with self.assertRaises(ValueError):
686 self.open(support.TESTFN, 'w', buffering=0)
687 support.gc_collect()
688 self.assertEqual(recorded, [])
689
690 def test_invalid_newline(self):
691 with warnings.catch_warnings(record=True) as recorded:
692 with self.assertRaises(ValueError):
693 self.open(support.TESTFN, 'w', newline='invalid')
694 support.gc_collect()
695 self.assertEqual(recorded, [])
696
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200699
700 def test_IOBase_finalize(self):
701 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
702 # class which inherits IOBase and an object of this class are caught
703 # in a reference cycle and close() is already in the method cache.
704 class MyIO(self.IOBase):
705 def close(self):
706 pass
707
708 # create an instance to populate the method cache
709 MyIO()
710 obj = MyIO()
711 obj.obj = obj
712 wr = weakref.ref(obj)
713 del MyIO
714 del obj
715 support.gc_collect()
716 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718class PyIOTest(IOTest):
719 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000720
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722class CommonBufferedTests:
723 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
724
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000725 def test_detach(self):
726 raw = self.MockRawIO()
727 buf = self.tp(raw)
728 self.assertIs(buf.detach(), raw)
729 self.assertRaises(ValueError, buf.detach)
730
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600731 repr(buf) # Should still work
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_fileno(self):
734 rawio = self.MockRawIO()
735 bufio = self.tp(rawio)
736
Ezio Melottib3aedd42010-11-20 19:04:17 +0000737 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000738
Zachary Ware9fe6d862013-12-08 00:20:35 -0600739 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000740 def test_no_fileno(self):
741 # XXX will we always have fileno() function? If so, kill
742 # this test. Else, write it.
743 pass
744
745 def test_invalid_args(self):
746 rawio = self.MockRawIO()
747 bufio = self.tp(rawio)
748 # Invalid whence
749 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200750 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751
752 def test_override_destructor(self):
753 tp = self.tp
754 record = []
755 class MyBufferedIO(tp):
756 def __del__(self):
757 record.append(1)
758 try:
759 f = super().__del__
760 except AttributeError:
761 pass
762 else:
763 f()
764 def close(self):
765 record.append(2)
766 super().close()
767 def flush(self):
768 record.append(3)
769 super().flush()
770 rawio = self.MockRawIO()
771 bufio = MyBufferedIO(rawio)
772 writable = bufio.writable()
773 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000774 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000775 if writable:
776 self.assertEqual(record, [1, 2, 3])
777 else:
778 self.assertEqual(record, [1, 2])
779
780 def test_context_manager(self):
781 # Test usability as a context manager
782 rawio = self.MockRawIO()
783 bufio = self.tp(rawio)
784 def _with():
785 with bufio:
786 pass
787 _with()
788 # bufio should now be closed, and using it a second time should raise
789 # a ValueError.
790 self.assertRaises(ValueError, _with)
791
792 def test_error_through_destructor(self):
793 # Test that the exception state is not modified by a destructor,
794 # even if close() fails.
795 rawio = self.CloseFailureIO()
796 def f():
797 self.tp(rawio).xyzzy
798 with support.captured_output("stderr") as s:
799 self.assertRaises(AttributeError, f)
800 s = s.getvalue().strip()
801 if s:
802 # The destructor *may* have printed an unraisable error, check it
803 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200804 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000805 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000806
Antoine Pitrou716c4442009-05-23 19:04:03 +0000807 def test_repr(self):
808 raw = self.MockRawIO()
809 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300810 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000811 self.assertEqual(repr(b), "<%s>" % clsname)
812 raw.name = "dummy"
813 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
814 raw.name = b"dummy"
815 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
816
Antoine Pitrou6be88762010-05-03 16:48:20 +0000817 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200818 # Test that buffered file is closed despite failed flush
819 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000820 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200821 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000822 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200823 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200824 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000825 raw.flush = bad_flush
826 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200827 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600828 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200829 self.assertTrue(raw.closed)
830 self.assertTrue(closed) # flush() called
831 self.assertFalse(closed[0]) # flush() called before file closed
832 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200833 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600834
835 def test_close_error_on_close(self):
836 raw = self.MockRawIO()
837 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200838 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600839 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200840 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600841 raw.close = bad_close
842 b = self.tp(raw)
843 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200844 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600845 b.close()
846 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300847 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600848 self.assertEqual(err.exception.__context__.args, ('flush',))
849 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000850
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300851 def test_nonnormalized_close_error_on_close(self):
852 # Issue #21677
853 raw = self.MockRawIO()
854 def bad_flush():
855 raise non_existing_flush
856 def bad_close():
857 raise non_existing_close
858 raw.close = bad_close
859 b = self.tp(raw)
860 b.flush = bad_flush
861 with self.assertRaises(NameError) as err: # exception not swallowed
862 b.close()
863 self.assertIn('non_existing_close', str(err.exception))
864 self.assertIsInstance(err.exception.__context__, NameError)
865 self.assertIn('non_existing_flush', str(err.exception.__context__))
866 self.assertFalse(b.closed)
867
Antoine Pitrou6be88762010-05-03 16:48:20 +0000868 def test_multi_close(self):
869 raw = self.MockRawIO()
870 b = self.tp(raw)
871 b.close()
872 b.close()
873 b.close()
874 self.assertRaises(ValueError, b.flush)
875
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000876 def test_unseekable(self):
877 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
878 self.assertRaises(self.UnsupportedOperation, bufio.tell)
879 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
880
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000881 def test_readonly_attributes(self):
882 raw = self.MockRawIO()
883 buf = self.tp(raw)
884 x = self.MockRawIO()
885 with self.assertRaises(AttributeError):
886 buf.raw = x
887
Guido van Rossum78892e42007-04-06 17:31:18 +0000888
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200889class SizeofTest:
890
891 @support.cpython_only
892 def test_sizeof(self):
893 bufsize1 = 4096
894 bufsize2 = 8192
895 rawio = self.MockRawIO()
896 bufio = self.tp(rawio, buffer_size=bufsize1)
897 size = sys.getsizeof(bufio) - bufsize1
898 rawio = self.MockRawIO()
899 bufio = self.tp(rawio, buffer_size=bufsize2)
900 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
901
Jesus Ceadc469452012-10-04 12:37:56 +0200902 @support.cpython_only
903 def test_buffer_freeing(self) :
904 bufsize = 4096
905 rawio = self.MockRawIO()
906 bufio = self.tp(rawio, buffer_size=bufsize)
907 size = sys.getsizeof(bufio) - bufsize
908 bufio.close()
909 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
912 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000914 def test_constructor(self):
915 rawio = self.MockRawIO([b"abc"])
916 bufio = self.tp(rawio)
917 bufio.__init__(rawio)
918 bufio.__init__(rawio, buffer_size=1024)
919 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000920 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
922 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
923 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
924 rawio = self.MockRawIO([b"abc"])
925 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000926 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000927
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200928 def test_uninitialized(self):
929 bufio = self.tp.__new__(self.tp)
930 del bufio
931 bufio = self.tp.__new__(self.tp)
932 self.assertRaisesRegex((ValueError, AttributeError),
933 'uninitialized|has no attribute',
934 bufio.read, 0)
935 bufio.__init__(self.MockRawIO())
936 self.assertEqual(bufio.read(0), b'')
937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000939 for arg in (None, 7):
940 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000942 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 # Invalid args
944 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000945
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946 def test_read1(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000949 self.assertEqual(b"a", bufio.read(1))
950 self.assertEqual(b"b", bufio.read1(1))
951 self.assertEqual(rawio._reads, 1)
952 self.assertEqual(b"c", bufio.read1(100))
953 self.assertEqual(rawio._reads, 1)
954 self.assertEqual(b"d", bufio.read1(100))
955 self.assertEqual(rawio._reads, 2)
956 self.assertEqual(b"efg", bufio.read1(100))
957 self.assertEqual(rawio._reads, 3)
958 self.assertEqual(b"", bufio.read1(100))
959 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 # Invalid args
961 self.assertRaises(ValueError, bufio.read1, -1)
962
963 def test_readinto(self):
964 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
965 bufio = self.tp(rawio)
966 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000967 self.assertEqual(bufio.readinto(b), 2)
968 self.assertEqual(b, b"ab")
969 self.assertEqual(bufio.readinto(b), 2)
970 self.assertEqual(b, b"cd")
971 self.assertEqual(bufio.readinto(b), 2)
972 self.assertEqual(b, b"ef")
973 self.assertEqual(bufio.readinto(b), 1)
974 self.assertEqual(b, b"gf")
975 self.assertEqual(bufio.readinto(b), 0)
976 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200977 rawio = self.MockRawIO((b"abc", None))
978 bufio = self.tp(rawio)
979 self.assertEqual(bufio.readinto(b), 2)
980 self.assertEqual(b, b"ab")
981 self.assertEqual(bufio.readinto(b), 1)
982 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983
Benjamin Petersona96fea02014-06-22 14:17:44 -0700984 def test_readinto1(self):
985 buffer_size = 10
986 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
987 bufio = self.tp(rawio, buffer_size=buffer_size)
988 b = bytearray(2)
989 self.assertEqual(bufio.peek(3), b'abc')
990 self.assertEqual(rawio._reads, 1)
991 self.assertEqual(bufio.readinto1(b), 2)
992 self.assertEqual(b, b"ab")
993 self.assertEqual(rawio._reads, 1)
994 self.assertEqual(bufio.readinto1(b), 1)
995 self.assertEqual(b[:1], b"c")
996 self.assertEqual(rawio._reads, 1)
997 self.assertEqual(bufio.readinto1(b), 2)
998 self.assertEqual(b, b"de")
999 self.assertEqual(rawio._reads, 2)
1000 b = bytearray(2*buffer_size)
1001 self.assertEqual(bufio.peek(3), b'fgh')
1002 self.assertEqual(rawio._reads, 3)
1003 self.assertEqual(bufio.readinto1(b), 6)
1004 self.assertEqual(b[:6], b"fghjkl")
1005 self.assertEqual(rawio._reads, 4)
1006
1007 def test_readinto_array(self):
1008 buffer_size = 60
1009 data = b"a" * 26
1010 rawio = self.MockRawIO((data,))
1011 bufio = self.tp(rawio, buffer_size=buffer_size)
1012
1013 # Create an array with element size > 1 byte
1014 b = array.array('i', b'x' * 32)
1015 assert len(b) != 16
1016
1017 # Read into it. We should get as many *bytes* as we can fit into b
1018 # (which is more than the number of elements)
1019 n = bufio.readinto(b)
1020 self.assertGreater(n, len(b))
1021
1022 # Check that old contents of b are preserved
1023 bm = memoryview(b).cast('B')
1024 self.assertLess(n, len(bm))
1025 self.assertEqual(bm[:n], data[:n])
1026 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1027
1028 def test_readinto1_array(self):
1029 buffer_size = 60
1030 data = b"a" * 26
1031 rawio = self.MockRawIO((data,))
1032 bufio = self.tp(rawio, buffer_size=buffer_size)
1033
1034 # Create an array with element size > 1 byte
1035 b = array.array('i', b'x' * 32)
1036 assert len(b) != 16
1037
1038 # Read into it. We should get as many *bytes* as we can fit into b
1039 # (which is more than the number of elements)
1040 n = bufio.readinto1(b)
1041 self.assertGreater(n, len(b))
1042
1043 # Check that old contents of b are preserved
1044 bm = memoryview(b).cast('B')
1045 self.assertLess(n, len(bm))
1046 self.assertEqual(bm[:n], data[:n])
1047 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1048
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001049 def test_readlines(self):
1050 def bufio():
1051 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1052 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001053 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1054 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1055 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001058 data = b"abcdefghi"
1059 dlen = len(data)
1060
1061 tests = [
1062 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1063 [ 100, [ 3, 3, 3], [ dlen ] ],
1064 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1065 ]
1066
1067 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 rawio = self.MockFileIO(data)
1069 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001070 pos = 0
1071 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001072 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001073 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001075 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001076
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001078 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1080 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(b"abcd", bufio.read(6))
1082 self.assertEqual(b"e", bufio.read(1))
1083 self.assertEqual(b"fg", bufio.read())
1084 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001085 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001086 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001087
Victor Stinnera80987f2011-05-25 22:47:16 +02001088 rawio = self.MockRawIO((b"a", None, None))
1089 self.assertEqual(b"a", rawio.readall())
1090 self.assertIsNone(rawio.readall())
1091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_read_past_eof(self):
1093 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1094 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001095
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 def test_read_all(self):
1099 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1100 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001101
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001103
Victor Stinner45df8202010-04-28 22:31:17 +00001104 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001105 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001107 try:
1108 # Write out many bytes with exactly the same number of 0's,
1109 # 1's... 255's. This will help us check that concurrent reading
1110 # doesn't duplicate or forget contents.
1111 N = 1000
1112 l = list(range(256)) * N
1113 random.shuffle(l)
1114 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001115 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001116 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001117 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001119 errors = []
1120 results = []
1121 def f():
1122 try:
1123 # Intra-buffer read then buffer-flushing read
1124 for n in cycle([1, 19]):
1125 s = bufio.read(n)
1126 if not s:
1127 break
1128 # list.append() is atomic
1129 results.append(s)
1130 except Exception as e:
1131 errors.append(e)
1132 raise
1133 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001134 with support.start_threads(threads):
1135 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001136 self.assertFalse(errors,
1137 "the following exceptions were caught: %r" % errors)
1138 s = b''.join(results)
1139 for i in range(256):
1140 c = bytes(bytearray([i]))
1141 self.assertEqual(s.count(c), N)
1142 finally:
1143 support.unlink(support.TESTFN)
1144
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001145 def test_unseekable(self):
1146 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1147 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1148 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1149 bufio.read(1)
1150 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1151 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 def test_misbehaved_io(self):
1154 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1155 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001156 self.assertRaises(OSError, bufio.seek, 0)
1157 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001158
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001159 def test_no_extraneous_read(self):
1160 # Issue #9550; when the raw IO object has satisfied the read request,
1161 # we should not issue any additional reads, otherwise it may block
1162 # (e.g. socket).
1163 bufsize = 16
1164 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1165 rawio = self.MockRawIO([b"x" * n])
1166 bufio = self.tp(rawio, bufsize)
1167 self.assertEqual(bufio.read(n), b"x" * n)
1168 # Simple case: one raw read is enough to satisfy the request.
1169 self.assertEqual(rawio._extraneous_reads, 0,
1170 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1171 # A more complex case where two raw reads are needed to satisfy
1172 # the request.
1173 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1174 bufio = self.tp(rawio, bufsize)
1175 self.assertEqual(bufio.read(n), b"x" * n)
1176 self.assertEqual(rawio._extraneous_reads, 0,
1177 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1178
1179
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001180class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 tp = io.BufferedReader
1182
1183 def test_constructor(self):
1184 BufferedReaderTest.test_constructor(self)
1185 # The allocation can succeed on 32-bit builds, e.g. with more
1186 # than 2GB RAM and a 64-bit kernel.
1187 if sys.maxsize > 0x7FFFFFFF:
1188 rawio = self.MockRawIO()
1189 bufio = self.tp(rawio)
1190 self.assertRaises((OverflowError, MemoryError, ValueError),
1191 bufio.__init__, rawio, sys.maxsize)
1192
1193 def test_initialization(self):
1194 rawio = self.MockRawIO([b"abc"])
1195 bufio = self.tp(rawio)
1196 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1197 self.assertRaises(ValueError, bufio.read)
1198 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1199 self.assertRaises(ValueError, bufio.read)
1200 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1201 self.assertRaises(ValueError, bufio.read)
1202
1203 def test_misbehaved_io_read(self):
1204 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1205 bufio = self.tp(rawio)
1206 # _pyio.BufferedReader seems to implement reading different, so that
1207 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001208 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209
1210 def test_garbage_collection(self):
1211 # C BufferedReader objects are collected.
1212 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001213 with support.check_warnings(('', ResourceWarning)):
1214 rawio = self.FileIO(support.TESTFN, "w+b")
1215 f = self.tp(rawio)
1216 f.f = f
1217 wr = weakref.ref(f)
1218 del f
1219 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001220 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001221
R David Murray67bfe802013-02-23 21:51:05 -05001222 def test_args_error(self):
1223 # Issue #17275
1224 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1225 self.tp(io.BytesIO(), 1024, 1024, 1024)
1226
1227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228class PyBufferedReaderTest(BufferedReaderTest):
1229 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001230
Guido van Rossuma9e20242007-03-08 00:43:48 +00001231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1233 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235 def test_constructor(self):
1236 rawio = self.MockRawIO()
1237 bufio = self.tp(rawio)
1238 bufio.__init__(rawio)
1239 bufio.__init__(rawio, buffer_size=1024)
1240 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001241 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001242 bufio.flush()
1243 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1244 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1245 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1246 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001249 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001251 def test_uninitialized(self):
1252 bufio = self.tp.__new__(self.tp)
1253 del bufio
1254 bufio = self.tp.__new__(self.tp)
1255 self.assertRaisesRegex((ValueError, AttributeError),
1256 'uninitialized|has no attribute',
1257 bufio.write, b'')
1258 bufio.__init__(self.MockRawIO())
1259 self.assertEqual(bufio.write(b''), 0)
1260
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001261 def test_detach_flush(self):
1262 raw = self.MockRawIO()
1263 buf = self.tp(raw)
1264 buf.write(b"howdy!")
1265 self.assertFalse(raw._write_stack)
1266 buf.detach()
1267 self.assertEqual(raw._write_stack, [b"howdy!"])
1268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 writer = self.MockRawIO()
1272 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001273 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001274 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001275
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 def test_write_overflow(self):
1277 writer = self.MockRawIO()
1278 bufio = self.tp(writer, 8)
1279 contents = b"abcdefghijklmnop"
1280 for n in range(0, len(contents), 3):
1281 bufio.write(contents[n:n+3])
1282 flushed = b"".join(writer._write_stack)
1283 # At least (total - 8) bytes were implicitly flushed, perhaps more
1284 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001285 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287 def check_writes(self, intermediate_func):
1288 # Lots of writes, test the flushed output is as expected.
1289 contents = bytes(range(256)) * 1000
1290 n = 0
1291 writer = self.MockRawIO()
1292 bufio = self.tp(writer, 13)
1293 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1294 def gen_sizes():
1295 for size in count(1):
1296 for i in range(15):
1297 yield size
1298 sizes = gen_sizes()
1299 while n < len(contents):
1300 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001301 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 intermediate_func(bufio)
1303 n += size
1304 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001305 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001306
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001307 def test_writes(self):
1308 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001309
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 def test_writes_and_flushes(self):
1311 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 def test_writes_and_seeks(self):
1314 def _seekabs(bufio):
1315 pos = bufio.tell()
1316 bufio.seek(pos + 1, 0)
1317 bufio.seek(pos - 1, 0)
1318 bufio.seek(pos, 0)
1319 self.check_writes(_seekabs)
1320 def _seekrel(bufio):
1321 pos = bufio.seek(0, 1)
1322 bufio.seek(+1, 1)
1323 bufio.seek(-1, 1)
1324 bufio.seek(pos, 0)
1325 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001327 def test_writes_and_truncates(self):
1328 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001330 def test_write_non_blocking(self):
1331 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001332 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001333
Ezio Melottib3aedd42010-11-20 19:04:17 +00001334 self.assertEqual(bufio.write(b"abcd"), 4)
1335 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336 # 1 byte will be written, the rest will be buffered
1337 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001338 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001339
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1341 raw.block_on(b"0")
1342 try:
1343 bufio.write(b"opqrwxyz0123456789")
1344 except self.BlockingIOError as e:
1345 written = e.characters_written
1346 else:
1347 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001348 self.assertEqual(written, 16)
1349 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001351
Ezio Melottib3aedd42010-11-20 19:04:17 +00001352 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 s = raw.pop_written()
1354 # Previously buffered bytes were flushed
1355 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001356
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001357 def test_write_and_rewind(self):
1358 raw = io.BytesIO()
1359 bufio = self.tp(raw, 4)
1360 self.assertEqual(bufio.write(b"abcdef"), 6)
1361 self.assertEqual(bufio.tell(), 6)
1362 bufio.seek(0, 0)
1363 self.assertEqual(bufio.write(b"XY"), 2)
1364 bufio.seek(6, 0)
1365 self.assertEqual(raw.getvalue(), b"XYcdef")
1366 self.assertEqual(bufio.write(b"123456"), 6)
1367 bufio.flush()
1368 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001369
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370 def test_flush(self):
1371 writer = self.MockRawIO()
1372 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001373 bufio.write(b"abc")
1374 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001375 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001376
Antoine Pitrou131a4892012-10-16 22:57:11 +02001377 def test_writelines(self):
1378 l = [b'ab', b'cd', b'ef']
1379 writer = self.MockRawIO()
1380 bufio = self.tp(writer, 8)
1381 bufio.writelines(l)
1382 bufio.flush()
1383 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1384
1385 def test_writelines_userlist(self):
1386 l = UserList([b'ab', b'cd', b'ef'])
1387 writer = self.MockRawIO()
1388 bufio = self.tp(writer, 8)
1389 bufio.writelines(l)
1390 bufio.flush()
1391 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1392
1393 def test_writelines_error(self):
1394 writer = self.MockRawIO()
1395 bufio = self.tp(writer, 8)
1396 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1397 self.assertRaises(TypeError, bufio.writelines, None)
1398 self.assertRaises(TypeError, bufio.writelines, 'abc')
1399
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 def test_destructor(self):
1401 writer = self.MockRawIO()
1402 bufio = self.tp(writer, 8)
1403 bufio.write(b"abc")
1404 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001405 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001406 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407
1408 def test_truncate(self):
1409 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001410 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 bufio = self.tp(raw, 8)
1412 bufio.write(b"abcdef")
1413 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001414 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001415 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 self.assertEqual(f.read(), b"abc")
1417
Victor Stinner45df8202010-04-28 22:31:17 +00001418 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001419 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001421 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422 # Write out many bytes from many threads and test they were
1423 # all flushed.
1424 N = 1000
1425 contents = bytes(range(256)) * N
1426 sizes = cycle([1, 19])
1427 n = 0
1428 queue = deque()
1429 while n < len(contents):
1430 size = next(sizes)
1431 queue.append(contents[n:n+size])
1432 n += size
1433 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001434 # We use a real file object because it allows us to
1435 # exercise situations where the GIL is released before
1436 # writing the buffer to the raw streams. This is in addition
1437 # to concurrency issues due to switching threads in the middle
1438 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001439 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001441 errors = []
1442 def f():
1443 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 while True:
1445 try:
1446 s = queue.popleft()
1447 except IndexError:
1448 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001449 bufio.write(s)
1450 except Exception as e:
1451 errors.append(e)
1452 raise
1453 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001454 with support.start_threads(threads):
1455 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001456 self.assertFalse(errors,
1457 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001458 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001459 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460 s = f.read()
1461 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001462 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001463 finally:
1464 support.unlink(support.TESTFN)
1465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 def test_misbehaved_io(self):
1467 rawio = self.MisbehavedRawIO()
1468 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001469 self.assertRaises(OSError, bufio.seek, 0)
1470 self.assertRaises(OSError, bufio.tell)
1471 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472
Florent Xicluna109d5732012-07-07 17:03:22 +02001473 def test_max_buffer_size_removal(self):
1474 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001475 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001476
Benjamin Peterson68623612012-12-20 11:53:11 -06001477 def test_write_error_on_close(self):
1478 raw = self.MockRawIO()
1479 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001480 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001481 raw.write = bad_write
1482 b = self.tp(raw)
1483 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001484 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001485 self.assertTrue(b.closed)
1486
Benjamin Peterson59406a92009-03-26 17:10:29 +00001487
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001488class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 tp = io.BufferedWriter
1490
1491 def test_constructor(self):
1492 BufferedWriterTest.test_constructor(self)
1493 # The allocation can succeed on 32-bit builds, e.g. with more
1494 # than 2GB RAM and a 64-bit kernel.
1495 if sys.maxsize > 0x7FFFFFFF:
1496 rawio = self.MockRawIO()
1497 bufio = self.tp(rawio)
1498 self.assertRaises((OverflowError, MemoryError, ValueError),
1499 bufio.__init__, rawio, sys.maxsize)
1500
1501 def test_initialization(self):
1502 rawio = self.MockRawIO()
1503 bufio = self.tp(rawio)
1504 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1505 self.assertRaises(ValueError, bufio.write, b"def")
1506 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1507 self.assertRaises(ValueError, bufio.write, b"def")
1508 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1509 self.assertRaises(ValueError, bufio.write, b"def")
1510
1511 def test_garbage_collection(self):
1512 # C BufferedWriter objects are collected, and collecting them flushes
1513 # all data to disk.
1514 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001515 with support.check_warnings(('', ResourceWarning)):
1516 rawio = self.FileIO(support.TESTFN, "w+b")
1517 f = self.tp(rawio)
1518 f.write(b"123xxx")
1519 f.x = f
1520 wr = weakref.ref(f)
1521 del f
1522 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001523 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001524 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001525 self.assertEqual(f.read(), b"123xxx")
1526
R David Murray67bfe802013-02-23 21:51:05 -05001527 def test_args_error(self):
1528 # Issue #17275
1529 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1530 self.tp(io.BytesIO(), 1024, 1024, 1024)
1531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001532
1533class PyBufferedWriterTest(BufferedWriterTest):
1534 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001535
Guido van Rossum01a27522007-03-07 01:00:12 +00001536class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001537
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001538 def test_constructor(self):
1539 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001540 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001541
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001542 def test_uninitialized(self):
1543 pair = self.tp.__new__(self.tp)
1544 del pair
1545 pair = self.tp.__new__(self.tp)
1546 self.assertRaisesRegex((ValueError, AttributeError),
1547 'uninitialized|has no attribute',
1548 pair.read, 0)
1549 self.assertRaisesRegex((ValueError, AttributeError),
1550 'uninitialized|has no attribute',
1551 pair.write, b'')
1552 pair.__init__(self.MockRawIO(), self.MockRawIO())
1553 self.assertEqual(pair.read(0), b'')
1554 self.assertEqual(pair.write(b''), 0)
1555
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001556 def test_detach(self):
1557 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1558 self.assertRaises(self.UnsupportedOperation, pair.detach)
1559
Florent Xicluna109d5732012-07-07 17:03:22 +02001560 def test_constructor_max_buffer_size_removal(self):
1561 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001562 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001563
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001564 def test_constructor_with_not_readable(self):
1565 class NotReadable(MockRawIO):
1566 def readable(self):
1567 return False
1568
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001569 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001570
1571 def test_constructor_with_not_writeable(self):
1572 class NotWriteable(MockRawIO):
1573 def writable(self):
1574 return False
1575
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001576 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001577
1578 def test_read(self):
1579 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1580
1581 self.assertEqual(pair.read(3), b"abc")
1582 self.assertEqual(pair.read(1), b"d")
1583 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001584 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1585 self.assertEqual(pair.read(None), b"abc")
1586
1587 def test_readlines(self):
1588 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1589 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1590 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1591 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001592
1593 def test_read1(self):
1594 # .read1() is delegated to the underlying reader object, so this test
1595 # can be shallow.
1596 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1597
1598 self.assertEqual(pair.read1(3), b"abc")
1599
1600 def test_readinto(self):
1601 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1602
1603 data = bytearray(5)
1604 self.assertEqual(pair.readinto(data), 5)
1605 self.assertEqual(data, b"abcde")
1606
1607 def test_write(self):
1608 w = self.MockRawIO()
1609 pair = self.tp(self.MockRawIO(), w)
1610
1611 pair.write(b"abc")
1612 pair.flush()
1613 pair.write(b"def")
1614 pair.flush()
1615 self.assertEqual(w._write_stack, [b"abc", b"def"])
1616
1617 def test_peek(self):
1618 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1619
1620 self.assertTrue(pair.peek(3).startswith(b"abc"))
1621 self.assertEqual(pair.read(3), b"abc")
1622
1623 def test_readable(self):
1624 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1625 self.assertTrue(pair.readable())
1626
1627 def test_writeable(self):
1628 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1629 self.assertTrue(pair.writable())
1630
1631 def test_seekable(self):
1632 # BufferedRWPairs are never seekable, even if their readers and writers
1633 # are.
1634 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1635 self.assertFalse(pair.seekable())
1636
1637 # .flush() is delegated to the underlying writer object and has been
1638 # tested in the test_write method.
1639
1640 def test_close_and_closed(self):
1641 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1642 self.assertFalse(pair.closed)
1643 pair.close()
1644 self.assertTrue(pair.closed)
1645
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001646 def test_reader_close_error_on_close(self):
1647 def reader_close():
1648 reader_non_existing
1649 reader = self.MockRawIO()
1650 reader.close = reader_close
1651 writer = self.MockRawIO()
1652 pair = self.tp(reader, writer)
1653 with self.assertRaises(NameError) as err:
1654 pair.close()
1655 self.assertIn('reader_non_existing', str(err.exception))
1656 self.assertTrue(pair.closed)
1657 self.assertFalse(reader.closed)
1658 self.assertTrue(writer.closed)
1659
1660 def test_writer_close_error_on_close(self):
1661 def writer_close():
1662 writer_non_existing
1663 reader = self.MockRawIO()
1664 writer = self.MockRawIO()
1665 writer.close = writer_close
1666 pair = self.tp(reader, writer)
1667 with self.assertRaises(NameError) as err:
1668 pair.close()
1669 self.assertIn('writer_non_existing', str(err.exception))
1670 self.assertFalse(pair.closed)
1671 self.assertTrue(reader.closed)
1672 self.assertFalse(writer.closed)
1673
1674 def test_reader_writer_close_error_on_close(self):
1675 def reader_close():
1676 reader_non_existing
1677 def writer_close():
1678 writer_non_existing
1679 reader = self.MockRawIO()
1680 reader.close = reader_close
1681 writer = self.MockRawIO()
1682 writer.close = writer_close
1683 pair = self.tp(reader, writer)
1684 with self.assertRaises(NameError) as err:
1685 pair.close()
1686 self.assertIn('reader_non_existing', str(err.exception))
1687 self.assertIsInstance(err.exception.__context__, NameError)
1688 self.assertIn('writer_non_existing', str(err.exception.__context__))
1689 self.assertFalse(pair.closed)
1690 self.assertFalse(reader.closed)
1691 self.assertFalse(writer.closed)
1692
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001693 def test_isatty(self):
1694 class SelectableIsAtty(MockRawIO):
1695 def __init__(self, isatty):
1696 MockRawIO.__init__(self)
1697 self._isatty = isatty
1698
1699 def isatty(self):
1700 return self._isatty
1701
1702 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1703 self.assertFalse(pair.isatty())
1704
1705 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1706 self.assertTrue(pair.isatty())
1707
1708 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1709 self.assertTrue(pair.isatty())
1710
1711 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1712 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001713
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001714 def test_weakref_clearing(self):
1715 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1716 ref = weakref.ref(brw)
1717 brw = None
1718 ref = None # Shouldn't segfault.
1719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720class CBufferedRWPairTest(BufferedRWPairTest):
1721 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723class PyBufferedRWPairTest(BufferedRWPairTest):
1724 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726
1727class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1728 read_mode = "rb+"
1729 write_mode = "wb+"
1730
1731 def test_constructor(self):
1732 BufferedReaderTest.test_constructor(self)
1733 BufferedWriterTest.test_constructor(self)
1734
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001735 def test_uninitialized(self):
1736 BufferedReaderTest.test_uninitialized(self)
1737 BufferedWriterTest.test_uninitialized(self)
1738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 def test_read_and_write(self):
1740 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001741 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001742
1743 self.assertEqual(b"as", rw.read(2))
1744 rw.write(b"ddd")
1745 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001746 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001748 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001749
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001750 def test_seek_and_tell(self):
1751 raw = self.BytesIO(b"asdfghjkl")
1752 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001753
Ezio Melottib3aedd42010-11-20 19:04:17 +00001754 self.assertEqual(b"as", rw.read(2))
1755 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001756 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001757 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001758
Antoine Pitroue05565e2011-08-20 14:39:23 +02001759 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001760 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001761 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001762 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001763 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001764 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001765 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001766 self.assertEqual(7, rw.tell())
1767 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001768 rw.flush()
1769 self.assertEqual(b"asdf123fl", raw.getvalue())
1770
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001771 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def check_flush_and_read(self, read_func):
1774 raw = self.BytesIO(b"abcdefghi")
1775 bufio = self.tp(raw)
1776
Ezio Melottib3aedd42010-11-20 19:04:17 +00001777 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001778 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001779 self.assertEqual(b"ef", read_func(bufio, 2))
1780 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001782 self.assertEqual(6, bufio.tell())
1783 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 raw.seek(0, 0)
1785 raw.write(b"XYZ")
1786 # flush() resets the read buffer
1787 bufio.flush()
1788 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001789 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790
1791 def test_flush_and_read(self):
1792 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1793
1794 def test_flush_and_readinto(self):
1795 def _readinto(bufio, n=-1):
1796 b = bytearray(n if n >= 0 else 9999)
1797 n = bufio.readinto(b)
1798 return bytes(b[:n])
1799 self.check_flush_and_read(_readinto)
1800
1801 def test_flush_and_peek(self):
1802 def _peek(bufio, n=-1):
1803 # This relies on the fact that the buffer can contain the whole
1804 # raw stream, otherwise peek() can return less.
1805 b = bufio.peek(n)
1806 if n != -1:
1807 b = b[:n]
1808 bufio.seek(len(b), 1)
1809 return b
1810 self.check_flush_and_read(_peek)
1811
1812 def test_flush_and_write(self):
1813 raw = self.BytesIO(b"abcdefghi")
1814 bufio = self.tp(raw)
1815
1816 bufio.write(b"123")
1817 bufio.flush()
1818 bufio.write(b"45")
1819 bufio.flush()
1820 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001821 self.assertEqual(b"12345fghi", raw.getvalue())
1822 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001823
1824 def test_threads(self):
1825 BufferedReaderTest.test_threads(self)
1826 BufferedWriterTest.test_threads(self)
1827
1828 def test_writes_and_peek(self):
1829 def _peek(bufio):
1830 bufio.peek(1)
1831 self.check_writes(_peek)
1832 def _peek(bufio):
1833 pos = bufio.tell()
1834 bufio.seek(-1, 1)
1835 bufio.peek(1)
1836 bufio.seek(pos, 0)
1837 self.check_writes(_peek)
1838
1839 def test_writes_and_reads(self):
1840 def _read(bufio):
1841 bufio.seek(-1, 1)
1842 bufio.read(1)
1843 self.check_writes(_read)
1844
1845 def test_writes_and_read1s(self):
1846 def _read1(bufio):
1847 bufio.seek(-1, 1)
1848 bufio.read1(1)
1849 self.check_writes(_read1)
1850
1851 def test_writes_and_readintos(self):
1852 def _read(bufio):
1853 bufio.seek(-1, 1)
1854 bufio.readinto(bytearray(1))
1855 self.check_writes(_read)
1856
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001857 def test_write_after_readahead(self):
1858 # Issue #6629: writing after the buffer was filled by readahead should
1859 # first rewind the raw stream.
1860 for overwrite_size in [1, 5]:
1861 raw = self.BytesIO(b"A" * 10)
1862 bufio = self.tp(raw, 4)
1863 # Trigger readahead
1864 self.assertEqual(bufio.read(1), b"A")
1865 self.assertEqual(bufio.tell(), 1)
1866 # Overwriting should rewind the raw stream if it needs so
1867 bufio.write(b"B" * overwrite_size)
1868 self.assertEqual(bufio.tell(), overwrite_size + 1)
1869 # If the write size was smaller than the buffer size, flush() and
1870 # check that rewind happens.
1871 bufio.flush()
1872 self.assertEqual(bufio.tell(), overwrite_size + 1)
1873 s = raw.getvalue()
1874 self.assertEqual(s,
1875 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1876
Antoine Pitrou7c404892011-05-13 00:13:33 +02001877 def test_write_rewind_write(self):
1878 # Various combinations of reading / writing / seeking backwards / writing again
1879 def mutate(bufio, pos1, pos2):
1880 assert pos2 >= pos1
1881 # Fill the buffer
1882 bufio.seek(pos1)
1883 bufio.read(pos2 - pos1)
1884 bufio.write(b'\x02')
1885 # This writes earlier than the previous write, but still inside
1886 # the buffer.
1887 bufio.seek(pos1)
1888 bufio.write(b'\x01')
1889
1890 b = b"\x80\x81\x82\x83\x84"
1891 for i in range(0, len(b)):
1892 for j in range(i, len(b)):
1893 raw = self.BytesIO(b)
1894 bufio = self.tp(raw, 100)
1895 mutate(bufio, i, j)
1896 bufio.flush()
1897 expected = bytearray(b)
1898 expected[j] = 2
1899 expected[i] = 1
1900 self.assertEqual(raw.getvalue(), expected,
1901 "failed result for i=%d, j=%d" % (i, j))
1902
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001903 def test_truncate_after_read_or_write(self):
1904 raw = self.BytesIO(b"A" * 10)
1905 bufio = self.tp(raw, 100)
1906 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1907 self.assertEqual(bufio.truncate(), 2)
1908 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1909 self.assertEqual(bufio.truncate(), 4)
1910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911 def test_misbehaved_io(self):
1912 BufferedReaderTest.test_misbehaved_io(self)
1913 BufferedWriterTest.test_misbehaved_io(self)
1914
Antoine Pitroue05565e2011-08-20 14:39:23 +02001915 def test_interleaved_read_write(self):
1916 # Test for issue #12213
1917 with self.BytesIO(b'abcdefgh') as raw:
1918 with self.tp(raw, 100) as f:
1919 f.write(b"1")
1920 self.assertEqual(f.read(1), b'b')
1921 f.write(b'2')
1922 self.assertEqual(f.read1(1), b'd')
1923 f.write(b'3')
1924 buf = bytearray(1)
1925 f.readinto(buf)
1926 self.assertEqual(buf, b'f')
1927 f.write(b'4')
1928 self.assertEqual(f.peek(1), b'h')
1929 f.flush()
1930 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1931
1932 with self.BytesIO(b'abc') as raw:
1933 with self.tp(raw, 100) as f:
1934 self.assertEqual(f.read(1), b'a')
1935 f.write(b"2")
1936 self.assertEqual(f.read(1), b'c')
1937 f.flush()
1938 self.assertEqual(raw.getvalue(), b'a2c')
1939
1940 def test_interleaved_readline_write(self):
1941 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1942 with self.tp(raw) as f:
1943 f.write(b'1')
1944 self.assertEqual(f.readline(), b'b\n')
1945 f.write(b'2')
1946 self.assertEqual(f.readline(), b'def\n')
1947 f.write(b'3')
1948 self.assertEqual(f.readline(), b'\n')
1949 f.flush()
1950 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1951
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001952 # You can't construct a BufferedRandom over a non-seekable stream.
1953 test_unseekable = None
1954
R David Murray67bfe802013-02-23 21:51:05 -05001955
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001956class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 tp = io.BufferedRandom
1958
1959 def test_constructor(self):
1960 BufferedRandomTest.test_constructor(self)
1961 # The allocation can succeed on 32-bit builds, e.g. with more
1962 # than 2GB RAM and a 64-bit kernel.
1963 if sys.maxsize > 0x7FFFFFFF:
1964 rawio = self.MockRawIO()
1965 bufio = self.tp(rawio)
1966 self.assertRaises((OverflowError, MemoryError, ValueError),
1967 bufio.__init__, rawio, sys.maxsize)
1968
1969 def test_garbage_collection(self):
1970 CBufferedReaderTest.test_garbage_collection(self)
1971 CBufferedWriterTest.test_garbage_collection(self)
1972
R David Murray67bfe802013-02-23 21:51:05 -05001973 def test_args_error(self):
1974 # Issue #17275
1975 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1976 self.tp(io.BytesIO(), 1024, 1024, 1024)
1977
1978
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001979class PyBufferedRandomTest(BufferedRandomTest):
1980 tp = pyio.BufferedRandom
1981
1982
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001983# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1984# properties:
1985# - A single output character can correspond to many bytes of input.
1986# - The number of input bytes to complete the character can be
1987# undetermined until the last input byte is received.
1988# - The number of input bytes can vary depending on previous input.
1989# - A single input byte can correspond to many characters of output.
1990# - The number of output characters can be undetermined until the
1991# last input byte is received.
1992# - The number of output characters can vary depending on previous input.
1993
1994class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1995 """
1996 For testing seek/tell behavior with a stateful, buffering decoder.
1997
1998 Input is a sequence of words. Words may be fixed-length (length set
1999 by input) or variable-length (period-terminated). In variable-length
2000 mode, extra periods are ignored. Possible words are:
2001 - 'i' followed by a number sets the input length, I (maximum 99).
2002 When I is set to 0, words are space-terminated.
2003 - 'o' followed by a number sets the output length, O (maximum 99).
2004 - Any other word is converted into a word followed by a period on
2005 the output. The output word consists of the input word truncated
2006 or padded out with hyphens to make its length equal to O. If O
2007 is 0, the word is output verbatim without truncating or padding.
2008 I and O are initially set to 1. When I changes, any buffered input is
2009 re-scanned according to the new I. EOF also terminates the last word.
2010 """
2011
2012 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002013 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002014 self.reset()
2015
2016 def __repr__(self):
2017 return '<SID %x>' % id(self)
2018
2019 def reset(self):
2020 self.i = 1
2021 self.o = 1
2022 self.buffer = bytearray()
2023
2024 def getstate(self):
2025 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2026 return bytes(self.buffer), i*100 + o
2027
2028 def setstate(self, state):
2029 buffer, io = state
2030 self.buffer = bytearray(buffer)
2031 i, o = divmod(io, 100)
2032 self.i, self.o = i ^ 1, o ^ 1
2033
2034 def decode(self, input, final=False):
2035 output = ''
2036 for b in input:
2037 if self.i == 0: # variable-length, terminated with period
2038 if b == ord('.'):
2039 if self.buffer:
2040 output += self.process_word()
2041 else:
2042 self.buffer.append(b)
2043 else: # fixed-length, terminate after self.i bytes
2044 self.buffer.append(b)
2045 if len(self.buffer) == self.i:
2046 output += self.process_word()
2047 if final and self.buffer: # EOF terminates the last word
2048 output += self.process_word()
2049 return output
2050
2051 def process_word(self):
2052 output = ''
2053 if self.buffer[0] == ord('i'):
2054 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2055 elif self.buffer[0] == ord('o'):
2056 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2057 else:
2058 output = self.buffer.decode('ascii')
2059 if len(output) < self.o:
2060 output += '-'*self.o # pad out with hyphens
2061 if self.o:
2062 output = output[:self.o] # truncate to output length
2063 output += '.'
2064 self.buffer = bytearray()
2065 return output
2066
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002067 codecEnabled = False
2068
2069 @classmethod
2070 def lookupTestDecoder(cls, name):
2071 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002072 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002073 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002074 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002075 incrementalencoder=None,
2076 streamreader=None, streamwriter=None,
2077 incrementaldecoder=cls)
2078
2079# Register the previous decoder for testing.
2080# Disabled by default, tests will enable it.
2081codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2082
2083
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002084class StatefulIncrementalDecoderTest(unittest.TestCase):
2085 """
2086 Make sure the StatefulIncrementalDecoder actually works.
2087 """
2088
2089 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002090 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002091 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002092 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002093 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002094 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002095 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002096 # I=0, O=6 (variable-length input, fixed-length output)
2097 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2098 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002099 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002100 # I=6, O=3 (fixed-length input > fixed-length output)
2101 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2102 # I=0, then 3; O=29, then 15 (with longer output)
2103 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2104 'a----------------------------.' +
2105 'b----------------------------.' +
2106 'cde--------------------------.' +
2107 'abcdefghijabcde.' +
2108 'a.b------------.' +
2109 '.c.------------.' +
2110 'd.e------------.' +
2111 'k--------------.' +
2112 'l--------------.' +
2113 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002114 ]
2115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002117 # Try a few one-shot test cases.
2118 for input, eof, output in self.test_cases:
2119 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002120 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002121
2122 # Also test an unfinished decode, followed by forcing EOF.
2123 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002124 self.assertEqual(d.decode(b'oiabcd'), '')
2125 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002126
2127class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002128
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002129 def setUp(self):
2130 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2131 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002132 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002133
Guido van Rossumd0712812007-04-11 16:32:43 +00002134 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002135 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002137 def test_constructor(self):
2138 r = self.BytesIO(b"\xc3\xa9\n\n")
2139 b = self.BufferedReader(r, 1000)
2140 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002141 t.__init__(b, encoding="latin-1", newline="\r\n")
2142 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002143 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002144 t.__init__(b, encoding="utf-8", line_buffering=True)
2145 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002146 self.assertEqual(t.line_buffering, True)
2147 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148 self.assertRaises(TypeError, t.__init__, b, newline=42)
2149 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2150
Nick Coghlana9b15242014-02-04 22:11:18 +10002151 def test_non_text_encoding_codecs_are_rejected(self):
2152 # Ensure the constructor complains if passed a codec that isn't
2153 # marked as a text encoding
2154 # http://bugs.python.org/issue20404
2155 r = self.BytesIO()
2156 b = self.BufferedWriter(r)
2157 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2158 self.TextIOWrapper(b, encoding="hex")
2159
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002160 def test_detach(self):
2161 r = self.BytesIO()
2162 b = self.BufferedWriter(r)
2163 t = self.TextIOWrapper(b)
2164 self.assertIs(t.detach(), b)
2165
2166 t = self.TextIOWrapper(b, encoding="ascii")
2167 t.write("howdy")
2168 self.assertFalse(r.getvalue())
2169 t.detach()
2170 self.assertEqual(r.getvalue(), b"howdy")
2171 self.assertRaises(ValueError, t.detach)
2172
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002173 # Operations independent of the detached stream should still work
2174 repr(t)
2175 self.assertEqual(t.encoding, "ascii")
2176 self.assertEqual(t.errors, "strict")
2177 self.assertFalse(t.line_buffering)
2178
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002179 def test_repr(self):
2180 raw = self.BytesIO("hello".encode("utf-8"))
2181 b = self.BufferedReader(raw)
2182 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002183 modname = self.TextIOWrapper.__module__
2184 self.assertEqual(repr(t),
2185 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2186 raw.name = "dummy"
2187 self.assertEqual(repr(t),
2188 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002189 t.mode = "r"
2190 self.assertEqual(repr(t),
2191 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002192 raw.name = b"dummy"
2193 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002194 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002195
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002196 t.buffer.detach()
2197 repr(t) # Should not raise an exception
2198
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 def test_line_buffering(self):
2200 r = self.BytesIO()
2201 b = self.BufferedWriter(r, 1000)
2202 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002203 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002205 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002207 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002208 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002209
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002210 def test_default_encoding(self):
2211 old_environ = dict(os.environ)
2212 try:
2213 # try to get a user preferred encoding different than the current
2214 # locale encoding to check that TextIOWrapper() uses the current
2215 # locale encoding and not the user preferred encoding
2216 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2217 if key in os.environ:
2218 del os.environ[key]
2219
2220 current_locale_encoding = locale.getpreferredencoding(False)
2221 b = self.BytesIO()
2222 t = self.TextIOWrapper(b)
2223 self.assertEqual(t.encoding, current_locale_encoding)
2224 finally:
2225 os.environ.clear()
2226 os.environ.update(old_environ)
2227
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002228 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002229 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002230 # Issue 15989
2231 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002232 b = self.BytesIO()
2233 b.fileno = lambda: _testcapi.INT_MAX + 1
2234 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2235 b.fileno = lambda: _testcapi.UINT_MAX + 1
2236 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 def test_encoding(self):
2239 # Check the encoding attribute is always set, and valid
2240 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002241 t = self.TextIOWrapper(b, encoding="utf-8")
2242 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002244 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002245 codecs.lookup(t.encoding)
2246
2247 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002248 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002249 b = self.BytesIO(b"abc\n\xff\n")
2250 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002251 self.assertRaises(UnicodeError, t.read)
2252 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 b = self.BytesIO(b"abc\n\xff\n")
2254 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002255 self.assertRaises(UnicodeError, t.read)
2256 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 b = self.BytesIO(b"abc\n\xff\n")
2258 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002259 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002260 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 b = self.BytesIO(b"abc\n\xff\n")
2262 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002263 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002265 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002266 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002267 b = self.BytesIO()
2268 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002269 self.assertRaises(UnicodeError, t.write, "\xff")
2270 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 b = self.BytesIO()
2272 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002273 self.assertRaises(UnicodeError, t.write, "\xff")
2274 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002275 b = self.BytesIO()
2276 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002277 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002278 t.write("abc\xffdef\n")
2279 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002280 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002281 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 b = self.BytesIO()
2283 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002284 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002285 t.write("abc\xffdef\n")
2286 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002287 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002289 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002290 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2291
2292 tests = [
2293 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002294 [ '', input_lines ],
2295 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2296 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2297 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002298 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002299 encodings = (
2300 'utf-8', 'latin-1',
2301 'utf-16', 'utf-16-le', 'utf-16-be',
2302 'utf-32', 'utf-32-le', 'utf-32-be',
2303 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002304
Guido van Rossum8358db22007-08-18 21:39:55 +00002305 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002306 # character in TextIOWrapper._pending_line.
2307 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002308 # XXX: str.encode() should return bytes
2309 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002310 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002311 for bufsize in range(1, 10):
2312 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002313 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2314 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002315 encoding=encoding)
2316 if do_reads:
2317 got_lines = []
2318 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002319 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002320 if c2 == '':
2321 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002322 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002323 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002324 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002325 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002326
2327 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002328 self.assertEqual(got_line, exp_line)
2329 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 def test_newlines_input(self):
2332 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002333 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2334 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002335 (None, normalized.decode("ascii").splitlines(keepends=True)),
2336 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2338 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2339 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002340 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002341 buf = self.BytesIO(testdata)
2342 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002344 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002345 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002346
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 def test_newlines_output(self):
2348 testdict = {
2349 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2350 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2351 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2352 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2353 }
2354 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2355 for newline, expected in tests:
2356 buf = self.BytesIO()
2357 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2358 txt.write("AAA\nB")
2359 txt.write("BB\nCCC\n")
2360 txt.write("X\rY\r\nZ")
2361 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002362 self.assertEqual(buf.closed, False)
2363 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002364
2365 def test_destructor(self):
2366 l = []
2367 base = self.BytesIO
2368 class MyBytesIO(base):
2369 def close(self):
2370 l.append(self.getvalue())
2371 base.close(self)
2372 b = MyBytesIO()
2373 t = self.TextIOWrapper(b, encoding="ascii")
2374 t.write("abc")
2375 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002376 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002377 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378
2379 def test_override_destructor(self):
2380 record = []
2381 class MyTextIO(self.TextIOWrapper):
2382 def __del__(self):
2383 record.append(1)
2384 try:
2385 f = super().__del__
2386 except AttributeError:
2387 pass
2388 else:
2389 f()
2390 def close(self):
2391 record.append(2)
2392 super().close()
2393 def flush(self):
2394 record.append(3)
2395 super().flush()
2396 b = self.BytesIO()
2397 t = MyTextIO(b, encoding="ascii")
2398 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002399 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002400 self.assertEqual(record, [1, 2, 3])
2401
2402 def test_error_through_destructor(self):
2403 # Test that the exception state is not modified by a destructor,
2404 # even if close() fails.
2405 rawio = self.CloseFailureIO()
2406 def f():
2407 self.TextIOWrapper(rawio).xyzzy
2408 with support.captured_output("stderr") as s:
2409 self.assertRaises(AttributeError, f)
2410 s = s.getvalue().strip()
2411 if s:
2412 # The destructor *may* have printed an unraisable error, check it
2413 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002414 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002415 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002416
Guido van Rossum9b76da62007-04-11 01:09:03 +00002417 # Systematic tests of the text I/O API
2418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002420 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002421 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002423 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002424 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002425 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002426 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002427 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002428 self.assertEqual(f.tell(), 0)
2429 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002430 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002431 self.assertEqual(f.seek(0), 0)
2432 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002433 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002434 self.assertEqual(f.read(2), "ab")
2435 self.assertEqual(f.read(1), "c")
2436 self.assertEqual(f.read(1), "")
2437 self.assertEqual(f.read(), "")
2438 self.assertEqual(f.tell(), cookie)
2439 self.assertEqual(f.seek(0), 0)
2440 self.assertEqual(f.seek(0, 2), cookie)
2441 self.assertEqual(f.write("def"), 3)
2442 self.assertEqual(f.seek(cookie), cookie)
2443 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002444 if enc.startswith("utf"):
2445 self.multi_line_test(f, enc)
2446 f.close()
2447
2448 def multi_line_test(self, f, enc):
2449 f.seek(0)
2450 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002451 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002452 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002453 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002454 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002455 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002456 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002457 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002458 wlines.append((f.tell(), line))
2459 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002460 f.seek(0)
2461 rlines = []
2462 while True:
2463 pos = f.tell()
2464 line = f.readline()
2465 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002466 break
2467 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002469
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002470 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002471 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002472 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002473 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002474 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002475 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002476 p2 = f.tell()
2477 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(f.tell(), p0)
2479 self.assertEqual(f.readline(), "\xff\n")
2480 self.assertEqual(f.tell(), p1)
2481 self.assertEqual(f.readline(), "\xff\n")
2482 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002483 f.seek(0)
2484 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002485 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002486 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002488 f.close()
2489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002490 def test_seeking(self):
2491 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002492 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002493 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002494 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002495 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002496 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002497 suffix = bytes(u_suffix.encode("utf-8"))
2498 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002499 with self.open(support.TESTFN, "wb") as f:
2500 f.write(line*2)
2501 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2502 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002503 self.assertEqual(s, str(prefix, "ascii"))
2504 self.assertEqual(f.tell(), prefix_size)
2505 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002507 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002508 # Regression test for a specific bug
2509 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002510 with self.open(support.TESTFN, "wb") as f:
2511 f.write(data)
2512 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2513 f._CHUNK_SIZE # Just test that it exists
2514 f._CHUNK_SIZE = 2
2515 f.readline()
2516 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002518 def test_seek_and_tell(self):
2519 #Test seek/tell using the StatefulIncrementalDecoder.
2520 # Make test faster by doing smaller seeks
2521 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002522
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002523 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002524 """Tell/seek to various points within a data stream and ensure
2525 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002527 f.write(data)
2528 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002529 f = self.open(support.TESTFN, encoding='test_decoder')
2530 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002531 decoded = f.read()
2532 f.close()
2533
Neal Norwitze2b07052008-03-18 19:52:05 +00002534 for i in range(min_pos, len(decoded) + 1): # seek positions
2535 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002536 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002537 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002538 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002540 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002541 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002542 f.close()
2543
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002544 # Enable the test decoder.
2545 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002546
2547 # Run the tests.
2548 try:
2549 # Try each test case.
2550 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002551 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002552
2553 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002554 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2555 offset = CHUNK_SIZE - len(input)//2
2556 prefix = b'.'*offset
2557 # Don't bother seeking into the prefix (takes too long).
2558 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002559 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002560
2561 # Ensure our test decoder won't interfere with subsequent tests.
2562 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002563 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002566 data = "1234567890"
2567 tests = ("utf-16",
2568 "utf-16-le",
2569 "utf-16-be",
2570 "utf-32",
2571 "utf-32-le",
2572 "utf-32-be")
2573 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574 buf = self.BytesIO()
2575 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002576 # Check if the BOM is written only once (see issue1753).
2577 f.write(data)
2578 f.write(data)
2579 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002580 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002581 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(f.read(), data * 2)
2583 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002584
Benjamin Petersona1b49012009-03-31 23:11:32 +00002585 def test_unreadable(self):
2586 class UnReadable(self.BytesIO):
2587 def readable(self):
2588 return False
2589 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002590 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002592 def test_read_one_by_one(self):
2593 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594 reads = ""
2595 while True:
2596 c = txt.read(1)
2597 if not c:
2598 break
2599 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002600 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002601
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002602 def test_readlines(self):
2603 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2604 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2605 txt.seek(0)
2606 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2607 txt.seek(0)
2608 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2609
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002610 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002611 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002612 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002613 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002614 reads = ""
2615 while True:
2616 c = txt.read(128)
2617 if not c:
2618 break
2619 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002620 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002621
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002622 def test_writelines(self):
2623 l = ['ab', 'cd', 'ef']
2624 buf = self.BytesIO()
2625 txt = self.TextIOWrapper(buf)
2626 txt.writelines(l)
2627 txt.flush()
2628 self.assertEqual(buf.getvalue(), b'abcdef')
2629
2630 def test_writelines_userlist(self):
2631 l = UserList(['ab', 'cd', 'ef'])
2632 buf = self.BytesIO()
2633 txt = self.TextIOWrapper(buf)
2634 txt.writelines(l)
2635 txt.flush()
2636 self.assertEqual(buf.getvalue(), b'abcdef')
2637
2638 def test_writelines_error(self):
2639 txt = self.TextIOWrapper(self.BytesIO())
2640 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2641 self.assertRaises(TypeError, txt.writelines, None)
2642 self.assertRaises(TypeError, txt.writelines, b'abc')
2643
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002644 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002646
2647 # read one char at a time
2648 reads = ""
2649 while True:
2650 c = txt.read(1)
2651 if not c:
2652 break
2653 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002654 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002655
2656 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002657 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002658 txt._CHUNK_SIZE = 4
2659
2660 reads = ""
2661 while True:
2662 c = txt.read(4)
2663 if not c:
2664 break
2665 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002666 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002667
2668 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002670 txt._CHUNK_SIZE = 4
2671
2672 reads = txt.read(4)
2673 reads += txt.read(4)
2674 reads += txt.readline()
2675 reads += txt.readline()
2676 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002677 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002678
2679 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002681 txt._CHUNK_SIZE = 4
2682
2683 reads = txt.read(4)
2684 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002685 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002686
2687 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002689 txt._CHUNK_SIZE = 4
2690
2691 reads = txt.read(4)
2692 pos = txt.tell()
2693 txt.seek(0)
2694 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002696
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002697 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 buffer = self.BytesIO(self.testdata)
2699 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002700
2701 self.assertEqual(buffer.seekable(), txt.seekable())
2702
Antoine Pitroue4501852009-05-14 18:55:55 +00002703 def test_append_bom(self):
2704 # The BOM is not written again when appending to a non-empty file
2705 filename = support.TESTFN
2706 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2707 with self.open(filename, 'w', encoding=charset) as f:
2708 f.write('aaa')
2709 pos = f.tell()
2710 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002711 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002712
2713 with self.open(filename, 'a', encoding=charset) as f:
2714 f.write('xxx')
2715 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002716 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002717
2718 def test_seek_bom(self):
2719 # Same test, but when seeking manually
2720 filename = support.TESTFN
2721 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2722 with self.open(filename, 'w', encoding=charset) as f:
2723 f.write('aaa')
2724 pos = f.tell()
2725 with self.open(filename, 'r+', encoding=charset) as f:
2726 f.seek(pos)
2727 f.write('zzz')
2728 f.seek(0)
2729 f.write('bbb')
2730 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002732
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002733 def test_seek_append_bom(self):
2734 # Same test, but first seek to the start and then to the end
2735 filename = support.TESTFN
2736 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2737 with self.open(filename, 'w', encoding=charset) as f:
2738 f.write('aaa')
2739 with self.open(filename, 'a', encoding=charset) as f:
2740 f.seek(0)
2741 f.seek(0, self.SEEK_END)
2742 f.write('xxx')
2743 with self.open(filename, 'rb') as f:
2744 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2745
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002746 def test_errors_property(self):
2747 with self.open(support.TESTFN, "w") as f:
2748 self.assertEqual(f.errors, "strict")
2749 with self.open(support.TESTFN, "w", errors="replace") as f:
2750 self.assertEqual(f.errors, "replace")
2751
Brett Cannon31f59292011-02-21 19:29:56 +00002752 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002753 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002754 def test_threads_write(self):
2755 # Issue6750: concurrent writes could duplicate data
2756 event = threading.Event()
2757 with self.open(support.TESTFN, "w", buffering=1) as f:
2758 def run(n):
2759 text = "Thread%03d\n" % n
2760 event.wait()
2761 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002762 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002763 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002764 with support.start_threads(threads, event.set):
2765 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002766 with self.open(support.TESTFN) as f:
2767 content = f.read()
2768 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002769 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002770
Antoine Pitrou6be88762010-05-03 16:48:20 +00002771 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002772 # Test that text file is closed despite failed flush
2773 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002774 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002775 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002776 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002777 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002778 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002779 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002780 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002781 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002782 self.assertTrue(txt.buffer.closed)
2783 self.assertTrue(closed) # flush() called
2784 self.assertFalse(closed[0]) # flush() called before file closed
2785 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002786 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002787
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002788 def test_close_error_on_close(self):
2789 buffer = self.BytesIO(self.testdata)
2790 def bad_flush():
2791 raise OSError('flush')
2792 def bad_close():
2793 raise OSError('close')
2794 buffer.close = bad_close
2795 txt = self.TextIOWrapper(buffer, encoding="ascii")
2796 txt.flush = bad_flush
2797 with self.assertRaises(OSError) as err: # exception not swallowed
2798 txt.close()
2799 self.assertEqual(err.exception.args, ('close',))
2800 self.assertIsInstance(err.exception.__context__, OSError)
2801 self.assertEqual(err.exception.__context__.args, ('flush',))
2802 self.assertFalse(txt.closed)
2803
2804 def test_nonnormalized_close_error_on_close(self):
2805 # Issue #21677
2806 buffer = self.BytesIO(self.testdata)
2807 def bad_flush():
2808 raise non_existing_flush
2809 def bad_close():
2810 raise non_existing_close
2811 buffer.close = bad_close
2812 txt = self.TextIOWrapper(buffer, encoding="ascii")
2813 txt.flush = bad_flush
2814 with self.assertRaises(NameError) as err: # exception not swallowed
2815 txt.close()
2816 self.assertIn('non_existing_close', str(err.exception))
2817 self.assertIsInstance(err.exception.__context__, NameError)
2818 self.assertIn('non_existing_flush', str(err.exception.__context__))
2819 self.assertFalse(txt.closed)
2820
Antoine Pitrou6be88762010-05-03 16:48:20 +00002821 def test_multi_close(self):
2822 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2823 txt.close()
2824 txt.close()
2825 txt.close()
2826 self.assertRaises(ValueError, txt.flush)
2827
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002828 def test_unseekable(self):
2829 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2830 self.assertRaises(self.UnsupportedOperation, txt.tell)
2831 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2832
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002833 def test_readonly_attributes(self):
2834 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2835 buf = self.BytesIO(self.testdata)
2836 with self.assertRaises(AttributeError):
2837 txt.buffer = buf
2838
Antoine Pitroue96ec682011-07-23 21:46:35 +02002839 def test_rawio(self):
2840 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2841 # that subprocess.Popen() can have the required unbuffered
2842 # semantics with universal_newlines=True.
2843 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2844 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2845 # Reads
2846 self.assertEqual(txt.read(4), 'abcd')
2847 self.assertEqual(txt.readline(), 'efghi\n')
2848 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2849
2850 def test_rawio_write_through(self):
2851 # Issue #12591: with write_through=True, writes don't need a flush
2852 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2853 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2854 write_through=True)
2855 txt.write('1')
2856 txt.write('23\n4')
2857 txt.write('5')
2858 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2859
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002860 def test_bufio_write_through(self):
2861 # Issue #21396: write_through=True doesn't force a flush()
2862 # on the underlying binary buffered object.
2863 flush_called, write_called = [], []
2864 class BufferedWriter(self.BufferedWriter):
2865 def flush(self, *args, **kwargs):
2866 flush_called.append(True)
2867 return super().flush(*args, **kwargs)
2868 def write(self, *args, **kwargs):
2869 write_called.append(True)
2870 return super().write(*args, **kwargs)
2871
2872 rawio = self.BytesIO()
2873 data = b"a"
2874 bufio = BufferedWriter(rawio, len(data)*2)
2875 textio = self.TextIOWrapper(bufio, encoding='ascii',
2876 write_through=True)
2877 # write to the buffered io but don't overflow the buffer
2878 text = data.decode('ascii')
2879 textio.write(text)
2880
2881 # buffer.flush is not called with write_through=True
2882 self.assertFalse(flush_called)
2883 # buffer.write *is* called with write_through=True
2884 self.assertTrue(write_called)
2885 self.assertEqual(rawio.getvalue(), b"") # no flush
2886
2887 write_called = [] # reset
2888 textio.write(text * 10) # total content is larger than bufio buffer
2889 self.assertTrue(write_called)
2890 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2891
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002892 def test_read_nonbytes(self):
2893 # Issue #17106
2894 # Crash when underlying read() returns non-bytes
2895 t = self.TextIOWrapper(self.StringIO('a'))
2896 self.assertRaises(TypeError, t.read, 1)
2897 t = self.TextIOWrapper(self.StringIO('a'))
2898 self.assertRaises(TypeError, t.readline)
2899 t = self.TextIOWrapper(self.StringIO('a'))
2900 self.assertRaises(TypeError, t.read)
2901
2902 def test_illegal_decoder(self):
2903 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002904 # Bypass the early encoding check added in issue 20404
2905 def _make_illegal_wrapper():
2906 quopri = codecs.lookup("quopri")
2907 quopri._is_text_encoding = True
2908 try:
2909 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2910 newline='\n', encoding="quopri")
2911 finally:
2912 quopri._is_text_encoding = False
2913 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002914 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002915 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002916 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002917 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002918 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002919 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002920 self.assertRaises(TypeError, t.read)
2921
Antoine Pitrou712cb732013-12-21 15:51:54 +01002922 def _check_create_at_shutdown(self, **kwargs):
2923 # Issue #20037: creating a TextIOWrapper at shutdown
2924 # shouldn't crash the interpreter.
2925 iomod = self.io.__name__
2926 code = """if 1:
2927 import codecs
2928 import {iomod} as io
2929
2930 # Avoid looking up codecs at shutdown
2931 codecs.lookup('utf-8')
2932
2933 class C:
2934 def __init__(self):
2935 self.buf = io.BytesIO()
2936 def __del__(self):
2937 io.TextIOWrapper(self.buf, **{kwargs})
2938 print("ok")
2939 c = C()
2940 """.format(iomod=iomod, kwargs=kwargs)
2941 return assert_python_ok("-c", code)
2942
2943 def test_create_at_shutdown_without_encoding(self):
2944 rc, out, err = self._check_create_at_shutdown()
2945 if err:
2946 # Can error out with a RuntimeError if the module state
2947 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002948 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002949 else:
2950 self.assertEqual("ok", out.decode().strip())
2951
2952 def test_create_at_shutdown_with_encoding(self):
2953 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2954 errors='strict')
2955 self.assertFalse(err)
2956 self.assertEqual("ok", out.decode().strip())
2957
Antoine Pitroub8503892014-04-29 10:14:02 +02002958 def test_read_byteslike(self):
2959 r = MemviewBytesIO(b'Just some random string\n')
2960 t = self.TextIOWrapper(r, 'utf-8')
2961
2962 # TextIOwrapper will not read the full string, because
2963 # we truncate it to a multiple of the native int size
2964 # so that we can construct a more complex memoryview.
2965 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2966
2967 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2968
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002969 def test_issue22849(self):
2970 class F(object):
2971 def readable(self): return True
2972 def writable(self): return True
2973 def seekable(self): return True
2974
2975 for i in range(10):
2976 try:
2977 self.TextIOWrapper(F(), encoding='utf-8')
2978 except Exception:
2979 pass
2980
2981 F.tell = lambda x: 0
2982 t = self.TextIOWrapper(F(), encoding='utf-8')
2983
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002984
Antoine Pitroub8503892014-04-29 10:14:02 +02002985class MemviewBytesIO(io.BytesIO):
2986 '''A BytesIO object whose read method returns memoryviews
2987 rather than bytes'''
2988
2989 def read1(self, len_):
2990 return _to_memoryview(super().read1(len_))
2991
2992 def read(self, len_):
2993 return _to_memoryview(super().read(len_))
2994
2995def _to_memoryview(buf):
2996 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2997
2998 arr = array.array('i')
2999 idx = len(buf) - len(buf) % arr.itemsize
3000 arr.frombytes(buf[:idx])
3001 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003002
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003004class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003005 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003006 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003007
3008 def test_initialization(self):
3009 r = self.BytesIO(b"\xc3\xa9\n\n")
3010 b = self.BufferedReader(r, 1000)
3011 t = self.TextIOWrapper(b)
3012 self.assertRaises(TypeError, t.__init__, b, newline=42)
3013 self.assertRaises(ValueError, t.read)
3014 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3015 self.assertRaises(ValueError, t.read)
3016
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003017 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3018 self.assertRaises(Exception, repr, t)
3019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003020 def test_garbage_collection(self):
3021 # C TextIOWrapper objects are collected, and collecting them flushes
3022 # all data to disk.
3023 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003024 with support.check_warnings(('', ResourceWarning)):
3025 rawio = io.FileIO(support.TESTFN, "wb")
3026 b = self.BufferedWriter(rawio)
3027 t = self.TextIOWrapper(b, encoding="ascii")
3028 t.write("456def")
3029 t.x = t
3030 wr = weakref.ref(t)
3031 del t
3032 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003033 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003034 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003035 self.assertEqual(f.read(), b"456def")
3036
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003037 def test_rwpair_cleared_before_textio(self):
3038 # Issue 13070: TextIOWrapper's finalization would crash when called
3039 # after the reference to the underlying BufferedRWPair's writer got
3040 # cleared by the GC.
3041 for i in range(1000):
3042 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3043 t1 = self.TextIOWrapper(b1, encoding="ascii")
3044 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3045 t2 = self.TextIOWrapper(b2, encoding="ascii")
3046 # circular references
3047 t1.buddy = t2
3048 t2.buddy = t1
3049 support.gc_collect()
3050
3051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003052class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003053 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003054 #shutdown_error = "LookupError: unknown encoding: ascii"
3055 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003056
3057
3058class IncrementalNewlineDecoderTest(unittest.TestCase):
3059
3060 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003061 # UTF-8 specific tests for a newline decoder
3062 def _check_decode(b, s, **kwargs):
3063 # We exercise getstate() / setstate() as well as decode()
3064 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003065 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003066 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003067 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003068
Antoine Pitrou180a3362008-12-14 16:36:46 +00003069 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003070
Antoine Pitrou180a3362008-12-14 16:36:46 +00003071 _check_decode(b'\xe8', "")
3072 _check_decode(b'\xa2', "")
3073 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003074
Antoine Pitrou180a3362008-12-14 16:36:46 +00003075 _check_decode(b'\xe8', "")
3076 _check_decode(b'\xa2', "")
3077 _check_decode(b'\x88', "\u8888")
3078
3079 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003080 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3081
Antoine Pitrou180a3362008-12-14 16:36:46 +00003082 decoder.reset()
3083 _check_decode(b'\n', "\n")
3084 _check_decode(b'\r', "")
3085 _check_decode(b'', "\n", final=True)
3086 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003087
Antoine Pitrou180a3362008-12-14 16:36:46 +00003088 _check_decode(b'\r', "")
3089 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003090
Antoine Pitrou180a3362008-12-14 16:36:46 +00003091 _check_decode(b'\r\r\n', "\n\n")
3092 _check_decode(b'\r', "")
3093 _check_decode(b'\r', "\n")
3094 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095
Antoine Pitrou180a3362008-12-14 16:36:46 +00003096 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3097 _check_decode(b'\xe8\xa2\x88', "\u8888")
3098 _check_decode(b'\n', "\n")
3099 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3100 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003102 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003103 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003104 if encoding is not None:
3105 encoder = codecs.getincrementalencoder(encoding)()
3106 def _decode_bytewise(s):
3107 # Decode one byte at a time
3108 for b in encoder.encode(s):
3109 result.append(decoder.decode(bytes([b])))
3110 else:
3111 encoder = None
3112 def _decode_bytewise(s):
3113 # Decode one char at a time
3114 for c in s:
3115 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003116 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003117 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003118 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003119 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003120 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003121 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003122 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003123 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003124 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003125 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003126 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003127 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003128 input = "abc"
3129 if encoder is not None:
3130 encoder.reset()
3131 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003132 self.assertEqual(decoder.decode(input), "abc")
3133 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003134
3135 def test_newline_decoder(self):
3136 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003137 # None meaning the IncrementalNewlineDecoder takes unicode input
3138 # rather than bytes input
3139 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003140 'utf-16', 'utf-16-le', 'utf-16-be',
3141 'utf-32', 'utf-32-le', 'utf-32-be',
3142 )
3143 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003144 decoder = enc and codecs.getincrementaldecoder(enc)()
3145 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3146 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003147 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003148 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3149 self.check_newline_decoding_utf8(decoder)
3150
Antoine Pitrou66913e22009-03-06 23:40:56 +00003151 def test_newline_bytes(self):
3152 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3153 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003154 self.assertEqual(dec.newlines, None)
3155 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3156 self.assertEqual(dec.newlines, None)
3157 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3158 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003159 dec = self.IncrementalNewlineDecoder(None, translate=False)
3160 _check(dec)
3161 dec = self.IncrementalNewlineDecoder(None, translate=True)
3162 _check(dec)
3163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003164class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3165 pass
3166
3167class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3168 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003169
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003170
Guido van Rossum01a27522007-03-07 01:00:12 +00003171# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003172
Guido van Rossum5abbf752007-08-27 17:39:33 +00003173class MiscIOTest(unittest.TestCase):
3174
Barry Warsaw40e82462008-11-20 20:14:50 +00003175 def tearDown(self):
3176 support.unlink(support.TESTFN)
3177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003178 def test___all__(self):
3179 for name in self.io.__all__:
3180 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003181 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003182 if name == "open":
3183 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003184 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003185 self.assertTrue(issubclass(obj, Exception), name)
3186 elif not name.startswith("SEEK_"):
3187 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003188
Barry Warsaw40e82462008-11-20 20:14:50 +00003189 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003190 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003191 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003192 f.close()
3193
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003194 with support.check_warnings(('', DeprecationWarning)):
3195 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003196 self.assertEqual(f.name, support.TESTFN)
3197 self.assertEqual(f.buffer.name, support.TESTFN)
3198 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3199 self.assertEqual(f.mode, "U")
3200 self.assertEqual(f.buffer.mode, "rb")
3201 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003202 f.close()
3203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003204 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003205 self.assertEqual(f.mode, "w+")
3206 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3207 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003209 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003210 self.assertEqual(g.mode, "wb")
3211 self.assertEqual(g.raw.mode, "wb")
3212 self.assertEqual(g.name, f.fileno())
3213 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003214 f.close()
3215 g.close()
3216
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003217 def test_io_after_close(self):
3218 for kwargs in [
3219 {"mode": "w"},
3220 {"mode": "wb"},
3221 {"mode": "w", "buffering": 1},
3222 {"mode": "w", "buffering": 2},
3223 {"mode": "wb", "buffering": 0},
3224 {"mode": "r"},
3225 {"mode": "rb"},
3226 {"mode": "r", "buffering": 1},
3227 {"mode": "r", "buffering": 2},
3228 {"mode": "rb", "buffering": 0},
3229 {"mode": "w+"},
3230 {"mode": "w+b"},
3231 {"mode": "w+", "buffering": 1},
3232 {"mode": "w+", "buffering": 2},
3233 {"mode": "w+b", "buffering": 0},
3234 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003235 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003236 f.close()
3237 self.assertRaises(ValueError, f.flush)
3238 self.assertRaises(ValueError, f.fileno)
3239 self.assertRaises(ValueError, f.isatty)
3240 self.assertRaises(ValueError, f.__iter__)
3241 if hasattr(f, "peek"):
3242 self.assertRaises(ValueError, f.peek, 1)
3243 self.assertRaises(ValueError, f.read)
3244 if hasattr(f, "read1"):
3245 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003246 if hasattr(f, "readall"):
3247 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003248 if hasattr(f, "readinto"):
3249 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003250 if hasattr(f, "readinto1"):
3251 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003252 self.assertRaises(ValueError, f.readline)
3253 self.assertRaises(ValueError, f.readlines)
3254 self.assertRaises(ValueError, f.seek, 0)
3255 self.assertRaises(ValueError, f.tell)
3256 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003257 self.assertRaises(ValueError, f.write,
3258 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003259 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003260 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003261
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003262 def test_blockingioerror(self):
3263 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003264 class C(str):
3265 pass
3266 c = C("")
3267 b = self.BlockingIOError(1, c)
3268 c.b = b
3269 b.c = c
3270 wr = weakref.ref(c)
3271 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003272 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003273 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003274
3275 def test_abcs(self):
3276 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003277 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3278 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3279 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3280 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003281
3282 def _check_abc_inheritance(self, abcmodule):
3283 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003284 self.assertIsInstance(f, abcmodule.IOBase)
3285 self.assertIsInstance(f, abcmodule.RawIOBase)
3286 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3287 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003288 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003289 self.assertIsInstance(f, abcmodule.IOBase)
3290 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3291 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3292 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003293 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003294 self.assertIsInstance(f, abcmodule.IOBase)
3295 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3296 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3297 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003298
3299 def test_abc_inheritance(self):
3300 # Test implementations inherit from their respective ABCs
3301 self._check_abc_inheritance(self)
3302
3303 def test_abc_inheritance_official(self):
3304 # Test implementations inherit from the official ABCs of the
3305 # baseline "io" module.
3306 self._check_abc_inheritance(io)
3307
Antoine Pitroue033e062010-10-29 10:38:18 +00003308 def _check_warn_on_dealloc(self, *args, **kwargs):
3309 f = open(*args, **kwargs)
3310 r = repr(f)
3311 with self.assertWarns(ResourceWarning) as cm:
3312 f = None
3313 support.gc_collect()
3314 self.assertIn(r, str(cm.warning.args[0]))
3315
3316 def test_warn_on_dealloc(self):
3317 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3318 self._check_warn_on_dealloc(support.TESTFN, "wb")
3319 self._check_warn_on_dealloc(support.TESTFN, "w")
3320
3321 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3322 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003323 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003324 for fd in fds:
3325 try:
3326 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003327 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003328 if e.errno != errno.EBADF:
3329 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003330 self.addCleanup(cleanup_fds)
3331 r, w = os.pipe()
3332 fds += r, w
3333 self._check_warn_on_dealloc(r, *args, **kwargs)
3334 # When using closefd=False, there's no warning
3335 r, w = os.pipe()
3336 fds += r, w
3337 with warnings.catch_warnings(record=True) as recorded:
3338 open(r, *args, closefd=False, **kwargs)
3339 support.gc_collect()
3340 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003341
3342 def test_warn_on_dealloc_fd(self):
3343 self._check_warn_on_dealloc_fd("rb", buffering=0)
3344 self._check_warn_on_dealloc_fd("rb")
3345 self._check_warn_on_dealloc_fd("r")
3346
3347
Antoine Pitrou243757e2010-11-05 21:15:39 +00003348 def test_pickling(self):
3349 # Pickling file objects is forbidden
3350 for kwargs in [
3351 {"mode": "w"},
3352 {"mode": "wb"},
3353 {"mode": "wb", "buffering": 0},
3354 {"mode": "r"},
3355 {"mode": "rb"},
3356 {"mode": "rb", "buffering": 0},
3357 {"mode": "w+"},
3358 {"mode": "w+b"},
3359 {"mode": "w+b", "buffering": 0},
3360 ]:
3361 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3362 with self.open(support.TESTFN, **kwargs) as f:
3363 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3364
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003365 def test_nonblock_pipe_write_bigbuf(self):
3366 self._test_nonblock_pipe_write(16*1024)
3367
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003368 def test_nonblock_pipe_write_smallbuf(self):
3369 self._test_nonblock_pipe_write(1024)
3370
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003371 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3372 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003373 def _test_nonblock_pipe_write(self, bufsize):
3374 sent = []
3375 received = []
3376 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003377 os.set_blocking(r, False)
3378 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003379
3380 # To exercise all code paths in the C implementation we need
3381 # to play with buffer sizes. For instance, if we choose a
3382 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3383 # then we will never get a partial write of the buffer.
3384 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3385 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3386
3387 with rf, wf:
3388 for N in 9999, 73, 7574:
3389 try:
3390 i = 0
3391 while True:
3392 msg = bytes([i % 26 + 97]) * N
3393 sent.append(msg)
3394 wf.write(msg)
3395 i += 1
3396
3397 except self.BlockingIOError as e:
3398 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003399 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003400 sent[-1] = sent[-1][:e.characters_written]
3401 received.append(rf.read())
3402 msg = b'BLOCKED'
3403 wf.write(msg)
3404 sent.append(msg)
3405
3406 while True:
3407 try:
3408 wf.flush()
3409 break
3410 except self.BlockingIOError as e:
3411 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003412 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003413 self.assertEqual(e.characters_written, 0)
3414 received.append(rf.read())
3415
3416 received += iter(rf.read, None)
3417
3418 sent, received = b''.join(sent), b''.join(received)
3419 self.assertTrue(sent == received)
3420 self.assertTrue(wf.closed)
3421 self.assertTrue(rf.closed)
3422
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003423 def test_create_fail(self):
3424 # 'x' mode fails if file is existing
3425 with self.open(support.TESTFN, 'w'):
3426 pass
3427 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3428
3429 def test_create_writes(self):
3430 # 'x' mode opens for writing
3431 with self.open(support.TESTFN, 'xb') as f:
3432 f.write(b"spam")
3433 with self.open(support.TESTFN, 'rb') as f:
3434 self.assertEqual(b"spam", f.read())
3435
Christian Heimes7b648752012-09-10 14:48:43 +02003436 def test_open_allargs(self):
3437 # there used to be a buffer overflow in the parser for rawmode
3438 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3439
3440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003441class CMiscIOTest(MiscIOTest):
3442 io = io
3443
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003444 def test_readinto_buffer_overflow(self):
3445 # Issue #18025
3446 class BadReader(self.io.BufferedIOBase):
3447 def read(self, n=-1):
3448 return b'x' * 10**6
3449 bufio = BadReader()
3450 b = bytearray(2)
3451 self.assertRaises(ValueError, bufio.readinto, b)
3452
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003453 @unittest.skipUnless(threading, 'Threading required for this test.')
3454 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3455 # Issue #23309: deadlocks at shutdown should be avoided when a
3456 # daemon thread and the main thread both write to a file.
3457 code = """if 1:
3458 import sys
3459 import time
3460 import threading
3461
3462 file = sys.{stream_name}
3463
3464 def run():
3465 while True:
3466 file.write('.')
3467 file.flush()
3468
3469 thread = threading.Thread(target=run)
3470 thread.daemon = True
3471 thread.start()
3472
3473 time.sleep(0.5)
3474 file.write('!')
3475 file.flush()
3476 """.format_map(locals())
3477 res, _ = run_python_until_end("-c", code)
3478 err = res.err.decode()
3479 if res.rc != 0:
3480 # Failure: should be a fatal error
3481 self.assertIn("Fatal Python error: could not acquire lock "
3482 "for <_io.BufferedWriter name='<{stream_name}>'> "
3483 "at interpreter shutdown, possibly due to "
3484 "daemon threads".format_map(locals()),
3485 err)
3486 else:
3487 self.assertFalse(err.strip('.!'))
3488
3489 def test_daemon_threads_shutdown_stdout_deadlock(self):
3490 self.check_daemon_threads_shutdown_deadlock('stdout')
3491
3492 def test_daemon_threads_shutdown_stderr_deadlock(self):
3493 self.check_daemon_threads_shutdown_deadlock('stderr')
3494
3495
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003496class PyMiscIOTest(MiscIOTest):
3497 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003498
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003499
3500@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3501class SignalsTest(unittest.TestCase):
3502
3503 def setUp(self):
3504 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3505
3506 def tearDown(self):
3507 signal.signal(signal.SIGALRM, self.oldalrm)
3508
3509 def alarm_interrupt(self, sig, frame):
3510 1/0
3511
3512 @unittest.skipUnless(threading, 'Threading required for this test.')
3513 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3514 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003515 invokes the signal handler, and bubbles up the exception raised
3516 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003517 read_results = []
3518 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003519 if hasattr(signal, 'pthread_sigmask'):
3520 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003521 s = os.read(r, 1)
3522 read_results.append(s)
3523 t = threading.Thread(target=_read)
3524 t.daemon = True
3525 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003526 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003527 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003528 try:
3529 wio = self.io.open(w, **fdopen_kwargs)
3530 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003531 # Fill the pipe enough that the write will be blocking.
3532 # It will be interrupted by the timer armed above. Since the
3533 # other thread has read one byte, the low-level write will
3534 # return with a successful (partial) result rather than an EINTR.
3535 # The buffered IO layer must check for pending signal
3536 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003537 signal.alarm(1)
3538 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003539 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003540 finally:
3541 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003542 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003543 # We got one byte, get another one and check that it isn't a
3544 # repeat of the first one.
3545 read_results.append(os.read(r, 1))
3546 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3547 finally:
3548 os.close(w)
3549 os.close(r)
3550 # This is deliberate. If we didn't close the file descriptor
3551 # before closing wio, wio would try to flush its internal
3552 # buffer, and block again.
3553 try:
3554 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003555 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003556 if e.errno != errno.EBADF:
3557 raise
3558
3559 def test_interrupted_write_unbuffered(self):
3560 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3561
3562 def test_interrupted_write_buffered(self):
3563 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3564
Victor Stinner6ab72862014-09-03 23:32:28 +02003565 # Issue #22331: The test hangs on FreeBSD 7.2
3566 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003567 def test_interrupted_write_text(self):
3568 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3569
Brett Cannon31f59292011-02-21 19:29:56 +00003570 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003571 def check_reentrant_write(self, data, **fdopen_kwargs):
3572 def on_alarm(*args):
3573 # Will be called reentrantly from the same thread
3574 wio.write(data)
3575 1/0
3576 signal.signal(signal.SIGALRM, on_alarm)
3577 r, w = os.pipe()
3578 wio = self.io.open(w, **fdopen_kwargs)
3579 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003580 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003581 # Either the reentrant call to wio.write() fails with RuntimeError,
3582 # or the signal handler raises ZeroDivisionError.
3583 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3584 while 1:
3585 for i in range(100):
3586 wio.write(data)
3587 wio.flush()
3588 # Make sure the buffer doesn't fill up and block further writes
3589 os.read(r, len(data) * 100)
3590 exc = cm.exception
3591 if isinstance(exc, RuntimeError):
3592 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3593 finally:
3594 wio.close()
3595 os.close(r)
3596
3597 def test_reentrant_write_buffered(self):
3598 self.check_reentrant_write(b"xy", mode="wb")
3599
3600 def test_reentrant_write_text(self):
3601 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3602
Antoine Pitrou707ce822011-02-25 21:24:11 +00003603 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3604 """Check that a buffered read, when it gets interrupted (either
3605 returning a partial result or EINTR), properly invokes the signal
3606 handler and retries if the latter returned successfully."""
3607 r, w = os.pipe()
3608 fdopen_kwargs["closefd"] = False
3609 def alarm_handler(sig, frame):
3610 os.write(w, b"bar")
3611 signal.signal(signal.SIGALRM, alarm_handler)
3612 try:
3613 rio = self.io.open(r, **fdopen_kwargs)
3614 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003615 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003616 # Expected behaviour:
3617 # - first raw read() returns partial b"foo"
3618 # - second raw read() returns EINTR
3619 # - third raw read() returns b"bar"
3620 self.assertEqual(decode(rio.read(6)), "foobar")
3621 finally:
3622 rio.close()
3623 os.close(w)
3624 os.close(r)
3625
Antoine Pitrou20db5112011-08-19 20:32:34 +02003626 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003627 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3628 mode="rb")
3629
Antoine Pitrou20db5112011-08-19 20:32:34 +02003630 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003631 self.check_interrupted_read_retry(lambda x: x,
3632 mode="r")
3633
3634 @unittest.skipUnless(threading, 'Threading required for this test.')
3635 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3636 """Check that a buffered write, when it gets interrupted (either
3637 returning a partial result or EINTR), properly invokes the signal
3638 handler and retries if the latter returned successfully."""
3639 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003640
Antoine Pitrou707ce822011-02-25 21:24:11 +00003641 # A quantity that exceeds the buffer size of an anonymous pipe's
3642 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003643 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003644 r, w = os.pipe()
3645 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003646
Antoine Pitrou707ce822011-02-25 21:24:11 +00003647 # We need a separate thread to read from the pipe and allow the
3648 # write() to finish. This thread is started after the SIGALRM is
3649 # received (forcing a first EINTR in write()).
3650 read_results = []
3651 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003652 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003653 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003654 try:
3655 while not write_finished:
3656 while r in select.select([r], [], [], 1.0)[0]:
3657 s = os.read(r, 1024)
3658 read_results.append(s)
3659 except BaseException as exc:
3660 nonlocal error
3661 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003662 t = threading.Thread(target=_read)
3663 t.daemon = True
3664 def alarm1(sig, frame):
3665 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003666 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003667 def alarm2(sig, frame):
3668 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003669
3670 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003671 signal.signal(signal.SIGALRM, alarm1)
3672 try:
3673 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003674 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003675 # Expected behaviour:
3676 # - first raw write() is partial (because of the limited pipe buffer
3677 # and the first alarm)
3678 # - second raw write() returns EINTR (because of the second alarm)
3679 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003680 written = wio.write(large_data)
3681 self.assertEqual(N, written)
3682
Antoine Pitrou707ce822011-02-25 21:24:11 +00003683 wio.flush()
3684 write_finished = True
3685 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003686
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003687 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003688 self.assertEqual(N, sum(len(x) for x in read_results))
3689 finally:
3690 write_finished = True
3691 os.close(w)
3692 os.close(r)
3693 # This is deliberate. If we didn't close the file descriptor
3694 # before closing wio, wio would try to flush its internal
3695 # buffer, and could block (in case of failure).
3696 try:
3697 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003698 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003699 if e.errno != errno.EBADF:
3700 raise
3701
Antoine Pitrou20db5112011-08-19 20:32:34 +02003702 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003703 self.check_interrupted_write_retry(b"x", mode="wb")
3704
Antoine Pitrou20db5112011-08-19 20:32:34 +02003705 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003706 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3707
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003708
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003709class CSignalsTest(SignalsTest):
3710 io = io
3711
3712class PySignalsTest(SignalsTest):
3713 io = pyio
3714
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003715 # Handling reentrancy issues would slow down _pyio even more, so the
3716 # tests are disabled.
3717 test_reentrant_write_buffered = None
3718 test_reentrant_write_text = None
3719
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003720
Ezio Melottidaa42c72013-03-23 16:30:16 +02003721def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003722 tests = (CIOTest, PyIOTest,
3723 CBufferedReaderTest, PyBufferedReaderTest,
3724 CBufferedWriterTest, PyBufferedWriterTest,
3725 CBufferedRWPairTest, PyBufferedRWPairTest,
3726 CBufferedRandomTest, PyBufferedRandomTest,
3727 StatefulIncrementalDecoderTest,
3728 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3729 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003730 CMiscIOTest, PyMiscIOTest,
3731 CSignalsTest, PySignalsTest,
3732 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003733
3734 # Put the namespaces of the IO module we are testing and some useful mock
3735 # classes in the __dict__ of each test.
3736 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003737 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003738 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3739 c_io_ns = {name : getattr(io, name) for name in all_members}
3740 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3741 globs = globals()
3742 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3743 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3744 # Avoid turning open into a bound method.
3745 py_io_ns["open"] = pyio.OpenWrapper
3746 for test in tests:
3747 if test.__name__.startswith("C"):
3748 for name, obj in c_io_ns.items():
3749 setattr(test, name, obj)
3750 elif test.__name__.startswith("Py"):
3751 for name, obj in py_io_ns.items():
3752 setattr(test, name, obj)
3753
Ezio Melottidaa42c72013-03-23 16:30:16 +02003754 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3755 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003756
3757if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003758 unittest.main()