blob: 02f74279a1ce9f4345f0e284a4a0c9c17b33fa55 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou25f85d42015-04-13 19:41:47 +020038from test.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000452 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000572 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200592 def check_flush_error_on_close(self, *args, **kwargs):
593 # Test that the file is closed despite failed flush
594 # and that flush() is called before file closed.
595 f = self.open(*args, **kwargs)
596 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200598 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200603 self.assertTrue(closed) # flush() called
604 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200605 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200606
607 def test_flush_error_on_close(self):
608 # raw file
609 # Issue #5700: io.FileIO calls flush() after file closed
610 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
611 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
612 self.check_flush_error_on_close(fd, 'wb', buffering=0)
613 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
614 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
615 os.close(fd)
616 # buffered io
617 self.check_flush_error_on_close(support.TESTFN, 'wb')
618 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
619 self.check_flush_error_on_close(fd, 'wb')
620 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
621 self.check_flush_error_on_close(fd, 'wb', closefd=False)
622 os.close(fd)
623 # text io
624 self.check_flush_error_on_close(support.TESTFN, 'w')
625 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
626 self.check_flush_error_on_close(fd, 'w')
627 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
628 self.check_flush_error_on_close(fd, 'w', closefd=False)
629 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000630
631 def test_multi_close(self):
632 f = self.open(support.TESTFN, "wb", buffering=0)
633 f.close()
634 f.close()
635 f.close()
636 self.assertRaises(ValueError, f.flush)
637
Antoine Pitrou328ec742010-09-14 18:37:24 +0000638 def test_RawIOBase_read(self):
639 # Exercise the default RawIOBase.read() implementation (which calls
640 # readinto() internally).
641 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
642 self.assertEqual(rawio.read(2), b"ab")
643 self.assertEqual(rawio.read(2), b"c")
644 self.assertEqual(rawio.read(2), b"d")
645 self.assertEqual(rawio.read(2), None)
646 self.assertEqual(rawio.read(2), b"ef")
647 self.assertEqual(rawio.read(2), b"g")
648 self.assertEqual(rawio.read(2), None)
649 self.assertEqual(rawio.read(2), b"")
650
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400651 def test_types_have_dict(self):
652 test = (
653 self.IOBase(),
654 self.RawIOBase(),
655 self.TextIOBase(),
656 self.StringIO(),
657 self.BytesIO()
658 )
659 for obj in test:
660 self.assertTrue(hasattr(obj, "__dict__"))
661
Ross Lagerwall59142db2011-10-31 20:34:46 +0200662 def test_opener(self):
663 with self.open(support.TESTFN, "w") as f:
664 f.write("egg\n")
665 fd = os.open(support.TESTFN, os.O_RDONLY)
666 def opener(path, flags):
667 return fd
668 with self.open("non-existent", "r", opener=opener) as f:
669 self.assertEqual(f.read(), "egg\n")
670
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200671 def test_fileio_closefd(self):
672 # Issue #4841
673 with self.open(__file__, 'rb') as f1, \
674 self.open(__file__, 'rb') as f2:
675 fileio = self.FileIO(f1.fileno(), closefd=False)
676 # .__init__() must not close f1
677 fileio.__init__(f2.fileno(), closefd=False)
678 f1.readline()
679 # .close() must not close f2
680 fileio.close()
681 f2.readline()
682
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300683 def test_nonbuffered_textio(self):
684 with warnings.catch_warnings(record=True) as recorded:
685 with self.assertRaises(ValueError):
686 self.open(support.TESTFN, 'w', buffering=0)
687 support.gc_collect()
688 self.assertEqual(recorded, [])
689
690 def test_invalid_newline(self):
691 with warnings.catch_warnings(record=True) as recorded:
692 with self.assertRaises(ValueError):
693 self.open(support.TESTFN, 'w', newline='invalid')
694 support.gc_collect()
695 self.assertEqual(recorded, [])
696
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200699
700 def test_IOBase_finalize(self):
701 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
702 # class which inherits IOBase and an object of this class are caught
703 # in a reference cycle and close() is already in the method cache.
704 class MyIO(self.IOBase):
705 def close(self):
706 pass
707
708 # create an instance to populate the method cache
709 MyIO()
710 obj = MyIO()
711 obj.obj = obj
712 wr = weakref.ref(obj)
713 del MyIO
714 del obj
715 support.gc_collect()
716 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718class PyIOTest(IOTest):
719 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000720
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700722@support.cpython_only
723class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700724
Gregory P. Smith14a88ab2015-04-14 13:54:09 -0700725 @unittest.expectedFailure # Test to be fixed by issue9858.
Gregory P. Smith054b0652015-04-14 12:58:05 -0700726 def test_RawIOBase_io_in_pyio_match(self):
727 """Test that pyio RawIOBase class has all c RawIOBase methods"""
728 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase)
729 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
730
731 def test_RawIOBase_pyio_in_io_match(self):
732 """Test that c RawIOBase class has all pyio RawIOBase methods"""
733 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
734 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
735
736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737class CommonBufferedTests:
738 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
739
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000740 def test_detach(self):
741 raw = self.MockRawIO()
742 buf = self.tp(raw)
743 self.assertIs(buf.detach(), raw)
744 self.assertRaises(ValueError, buf.detach)
745
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600746 repr(buf) # Should still work
747
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 def test_fileno(self):
749 rawio = self.MockRawIO()
750 bufio = self.tp(rawio)
751
Ezio Melottib3aedd42010-11-20 19:04:17 +0000752 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753
Zachary Ware9fe6d862013-12-08 00:20:35 -0600754 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 def test_no_fileno(self):
756 # XXX will we always have fileno() function? If so, kill
757 # this test. Else, write it.
758 pass
759
760 def test_invalid_args(self):
761 rawio = self.MockRawIO()
762 bufio = self.tp(rawio)
763 # Invalid whence
764 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200765 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766
767 def test_override_destructor(self):
768 tp = self.tp
769 record = []
770 class MyBufferedIO(tp):
771 def __del__(self):
772 record.append(1)
773 try:
774 f = super().__del__
775 except AttributeError:
776 pass
777 else:
778 f()
779 def close(self):
780 record.append(2)
781 super().close()
782 def flush(self):
783 record.append(3)
784 super().flush()
785 rawio = self.MockRawIO()
786 bufio = MyBufferedIO(rawio)
787 writable = bufio.writable()
788 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000789 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000790 if writable:
791 self.assertEqual(record, [1, 2, 3])
792 else:
793 self.assertEqual(record, [1, 2])
794
795 def test_context_manager(self):
796 # Test usability as a context manager
797 rawio = self.MockRawIO()
798 bufio = self.tp(rawio)
799 def _with():
800 with bufio:
801 pass
802 _with()
803 # bufio should now be closed, and using it a second time should raise
804 # a ValueError.
805 self.assertRaises(ValueError, _with)
806
807 def test_error_through_destructor(self):
808 # Test that the exception state is not modified by a destructor,
809 # even if close() fails.
810 rawio = self.CloseFailureIO()
811 def f():
812 self.tp(rawio).xyzzy
813 with support.captured_output("stderr") as s:
814 self.assertRaises(AttributeError, f)
815 s = s.getvalue().strip()
816 if s:
817 # The destructor *may* have printed an unraisable error, check it
818 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200819 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000820 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000821
Antoine Pitrou716c4442009-05-23 19:04:03 +0000822 def test_repr(self):
823 raw = self.MockRawIO()
824 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300825 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000826 self.assertEqual(repr(b), "<%s>" % clsname)
827 raw.name = "dummy"
828 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
829 raw.name = b"dummy"
830 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
831
Antoine Pitrou6be88762010-05-03 16:48:20 +0000832 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200833 # Test that buffered file is closed despite failed flush
834 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000835 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200836 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000837 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200838 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200839 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000840 raw.flush = bad_flush
841 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200842 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600843 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200844 self.assertTrue(raw.closed)
845 self.assertTrue(closed) # flush() called
846 self.assertFalse(closed[0]) # flush() called before file closed
847 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200848 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600849
850 def test_close_error_on_close(self):
851 raw = self.MockRawIO()
852 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200853 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600854 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200855 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600856 raw.close = bad_close
857 b = self.tp(raw)
858 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200859 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600860 b.close()
861 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300862 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600863 self.assertEqual(err.exception.__context__.args, ('flush',))
864 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000865
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300866 def test_nonnormalized_close_error_on_close(self):
867 # Issue #21677
868 raw = self.MockRawIO()
869 def bad_flush():
870 raise non_existing_flush
871 def bad_close():
872 raise non_existing_close
873 raw.close = bad_close
874 b = self.tp(raw)
875 b.flush = bad_flush
876 with self.assertRaises(NameError) as err: # exception not swallowed
877 b.close()
878 self.assertIn('non_existing_close', str(err.exception))
879 self.assertIsInstance(err.exception.__context__, NameError)
880 self.assertIn('non_existing_flush', str(err.exception.__context__))
881 self.assertFalse(b.closed)
882
Antoine Pitrou6be88762010-05-03 16:48:20 +0000883 def test_multi_close(self):
884 raw = self.MockRawIO()
885 b = self.tp(raw)
886 b.close()
887 b.close()
888 b.close()
889 self.assertRaises(ValueError, b.flush)
890
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000891 def test_unseekable(self):
892 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
893 self.assertRaises(self.UnsupportedOperation, bufio.tell)
894 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
895
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000896 def test_readonly_attributes(self):
897 raw = self.MockRawIO()
898 buf = self.tp(raw)
899 x = self.MockRawIO()
900 with self.assertRaises(AttributeError):
901 buf.raw = x
902
Guido van Rossum78892e42007-04-06 17:31:18 +0000903
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200904class SizeofTest:
905
906 @support.cpython_only
907 def test_sizeof(self):
908 bufsize1 = 4096
909 bufsize2 = 8192
910 rawio = self.MockRawIO()
911 bufio = self.tp(rawio, buffer_size=bufsize1)
912 size = sys.getsizeof(bufio) - bufsize1
913 rawio = self.MockRawIO()
914 bufio = self.tp(rawio, buffer_size=bufsize2)
915 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
916
Jesus Ceadc469452012-10-04 12:37:56 +0200917 @support.cpython_only
918 def test_buffer_freeing(self) :
919 bufsize = 4096
920 rawio = self.MockRawIO()
921 bufio = self.tp(rawio, buffer_size=bufsize)
922 size = sys.getsizeof(bufio) - bufsize
923 bufio.close()
924 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
927 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 def test_constructor(self):
930 rawio = self.MockRawIO([b"abc"])
931 bufio = self.tp(rawio)
932 bufio.__init__(rawio)
933 bufio.__init__(rawio, buffer_size=1024)
934 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000935 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
938 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
939 rawio = self.MockRawIO([b"abc"])
940 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000941 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000942
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200943 def test_uninitialized(self):
944 bufio = self.tp.__new__(self.tp)
945 del bufio
946 bufio = self.tp.__new__(self.tp)
947 self.assertRaisesRegex((ValueError, AttributeError),
948 'uninitialized|has no attribute',
949 bufio.read, 0)
950 bufio.__init__(self.MockRawIO())
951 self.assertEqual(bufio.read(0), b'')
952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000953 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000954 for arg in (None, 7):
955 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
956 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000957 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000958 # Invalid args
959 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961 def test_read1(self):
962 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
963 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000964 self.assertEqual(b"a", bufio.read(1))
965 self.assertEqual(b"b", bufio.read1(1))
966 self.assertEqual(rawio._reads, 1)
967 self.assertEqual(b"c", bufio.read1(100))
968 self.assertEqual(rawio._reads, 1)
969 self.assertEqual(b"d", bufio.read1(100))
970 self.assertEqual(rawio._reads, 2)
971 self.assertEqual(b"efg", bufio.read1(100))
972 self.assertEqual(rawio._reads, 3)
973 self.assertEqual(b"", bufio.read1(100))
974 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 # Invalid args
976 self.assertRaises(ValueError, bufio.read1, -1)
977
978 def test_readinto(self):
979 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
980 bufio = self.tp(rawio)
981 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000982 self.assertEqual(bufio.readinto(b), 2)
983 self.assertEqual(b, b"ab")
984 self.assertEqual(bufio.readinto(b), 2)
985 self.assertEqual(b, b"cd")
986 self.assertEqual(bufio.readinto(b), 2)
987 self.assertEqual(b, b"ef")
988 self.assertEqual(bufio.readinto(b), 1)
989 self.assertEqual(b, b"gf")
990 self.assertEqual(bufio.readinto(b), 0)
991 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200992 rawio = self.MockRawIO((b"abc", None))
993 bufio = self.tp(rawio)
994 self.assertEqual(bufio.readinto(b), 2)
995 self.assertEqual(b, b"ab")
996 self.assertEqual(bufio.readinto(b), 1)
997 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998
Benjamin Petersona96fea02014-06-22 14:17:44 -0700999 def test_readinto1(self):
1000 buffer_size = 10
1001 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1002 bufio = self.tp(rawio, buffer_size=buffer_size)
1003 b = bytearray(2)
1004 self.assertEqual(bufio.peek(3), b'abc')
1005 self.assertEqual(rawio._reads, 1)
1006 self.assertEqual(bufio.readinto1(b), 2)
1007 self.assertEqual(b, b"ab")
1008 self.assertEqual(rawio._reads, 1)
1009 self.assertEqual(bufio.readinto1(b), 1)
1010 self.assertEqual(b[:1], b"c")
1011 self.assertEqual(rawio._reads, 1)
1012 self.assertEqual(bufio.readinto1(b), 2)
1013 self.assertEqual(b, b"de")
1014 self.assertEqual(rawio._reads, 2)
1015 b = bytearray(2*buffer_size)
1016 self.assertEqual(bufio.peek(3), b'fgh')
1017 self.assertEqual(rawio._reads, 3)
1018 self.assertEqual(bufio.readinto1(b), 6)
1019 self.assertEqual(b[:6], b"fghjkl")
1020 self.assertEqual(rawio._reads, 4)
1021
1022 def test_readinto_array(self):
1023 buffer_size = 60
1024 data = b"a" * 26
1025 rawio = self.MockRawIO((data,))
1026 bufio = self.tp(rawio, buffer_size=buffer_size)
1027
1028 # Create an array with element size > 1 byte
1029 b = array.array('i', b'x' * 32)
1030 assert len(b) != 16
1031
1032 # Read into it. We should get as many *bytes* as we can fit into b
1033 # (which is more than the number of elements)
1034 n = bufio.readinto(b)
1035 self.assertGreater(n, len(b))
1036
1037 # Check that old contents of b are preserved
1038 bm = memoryview(b).cast('B')
1039 self.assertLess(n, len(bm))
1040 self.assertEqual(bm[:n], data[:n])
1041 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1042
1043 def test_readinto1_array(self):
1044 buffer_size = 60
1045 data = b"a" * 26
1046 rawio = self.MockRawIO((data,))
1047 bufio = self.tp(rawio, buffer_size=buffer_size)
1048
1049 # Create an array with element size > 1 byte
1050 b = array.array('i', b'x' * 32)
1051 assert len(b) != 16
1052
1053 # Read into it. We should get as many *bytes* as we can fit into b
1054 # (which is more than the number of elements)
1055 n = bufio.readinto1(b)
1056 self.assertGreater(n, len(b))
1057
1058 # Check that old contents of b are preserved
1059 bm = memoryview(b).cast('B')
1060 self.assertLess(n, len(bm))
1061 self.assertEqual(bm[:n], data[:n])
1062 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1063
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001064 def test_readlines(self):
1065 def bufio():
1066 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1067 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001068 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1069 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1070 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001073 data = b"abcdefghi"
1074 dlen = len(data)
1075
1076 tests = [
1077 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1078 [ 100, [ 3, 3, 3], [ dlen ] ],
1079 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1080 ]
1081
1082 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 rawio = self.MockFileIO(data)
1084 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001085 pos = 0
1086 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001088 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001090 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001093 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1095 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(b"abcd", bufio.read(6))
1097 self.assertEqual(b"e", bufio.read(1))
1098 self.assertEqual(b"fg", bufio.read())
1099 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001100 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001101 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001102
Victor Stinnera80987f2011-05-25 22:47:16 +02001103 rawio = self.MockRawIO((b"a", None, None))
1104 self.assertEqual(b"a", rawio.readall())
1105 self.assertIsNone(rawio.readall())
1106
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 def test_read_past_eof(self):
1108 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1109 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001110
Ezio Melottib3aedd42010-11-20 19:04:17 +00001111 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_read_all(self):
1114 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1115 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001116
Ezio Melottib3aedd42010-11-20 19:04:17 +00001117 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001118
Victor Stinner45df8202010-04-28 22:31:17 +00001119 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001120 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001122 try:
1123 # Write out many bytes with exactly the same number of 0's,
1124 # 1's... 255's. This will help us check that concurrent reading
1125 # doesn't duplicate or forget contents.
1126 N = 1000
1127 l = list(range(256)) * N
1128 random.shuffle(l)
1129 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001130 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001131 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001132 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001134 errors = []
1135 results = []
1136 def f():
1137 try:
1138 # Intra-buffer read then buffer-flushing read
1139 for n in cycle([1, 19]):
1140 s = bufio.read(n)
1141 if not s:
1142 break
1143 # list.append() is atomic
1144 results.append(s)
1145 except Exception as e:
1146 errors.append(e)
1147 raise
1148 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001149 with support.start_threads(threads):
1150 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001151 self.assertFalse(errors,
1152 "the following exceptions were caught: %r" % errors)
1153 s = b''.join(results)
1154 for i in range(256):
1155 c = bytes(bytearray([i]))
1156 self.assertEqual(s.count(c), N)
1157 finally:
1158 support.unlink(support.TESTFN)
1159
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001160 def test_unseekable(self):
1161 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1162 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1163 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1164 bufio.read(1)
1165 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1166 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 def test_misbehaved_io(self):
1169 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1170 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001171 self.assertRaises(OSError, bufio.seek, 0)
1172 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001174 def test_no_extraneous_read(self):
1175 # Issue #9550; when the raw IO object has satisfied the read request,
1176 # we should not issue any additional reads, otherwise it may block
1177 # (e.g. socket).
1178 bufsize = 16
1179 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1180 rawio = self.MockRawIO([b"x" * n])
1181 bufio = self.tp(rawio, bufsize)
1182 self.assertEqual(bufio.read(n), b"x" * n)
1183 # Simple case: one raw read is enough to satisfy the request.
1184 self.assertEqual(rawio._extraneous_reads, 0,
1185 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1186 # A more complex case where two raw reads are needed to satisfy
1187 # the request.
1188 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1189 bufio = self.tp(rawio, bufsize)
1190 self.assertEqual(bufio.read(n), b"x" * n)
1191 self.assertEqual(rawio._extraneous_reads, 0,
1192 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1193
1194
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001195class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 tp = io.BufferedReader
1197
1198 def test_constructor(self):
1199 BufferedReaderTest.test_constructor(self)
1200 # The allocation can succeed on 32-bit builds, e.g. with more
1201 # than 2GB RAM and a 64-bit kernel.
1202 if sys.maxsize > 0x7FFFFFFF:
1203 rawio = self.MockRawIO()
1204 bufio = self.tp(rawio)
1205 self.assertRaises((OverflowError, MemoryError, ValueError),
1206 bufio.__init__, rawio, sys.maxsize)
1207
1208 def test_initialization(self):
1209 rawio = self.MockRawIO([b"abc"])
1210 bufio = self.tp(rawio)
1211 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1212 self.assertRaises(ValueError, bufio.read)
1213 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1214 self.assertRaises(ValueError, bufio.read)
1215 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1216 self.assertRaises(ValueError, bufio.read)
1217
1218 def test_misbehaved_io_read(self):
1219 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1220 bufio = self.tp(rawio)
1221 # _pyio.BufferedReader seems to implement reading different, so that
1222 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001223 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224
1225 def test_garbage_collection(self):
1226 # C BufferedReader objects are collected.
1227 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001228 with support.check_warnings(('', ResourceWarning)):
1229 rawio = self.FileIO(support.TESTFN, "w+b")
1230 f = self.tp(rawio)
1231 f.f = f
1232 wr = weakref.ref(f)
1233 del f
1234 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001235 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236
R David Murray67bfe802013-02-23 21:51:05 -05001237 def test_args_error(self):
1238 # Issue #17275
1239 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1240 self.tp(io.BytesIO(), 1024, 1024, 1024)
1241
1242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243class PyBufferedReaderTest(BufferedReaderTest):
1244 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001245
Guido van Rossuma9e20242007-03-08 00:43:48 +00001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1248 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def test_constructor(self):
1251 rawio = self.MockRawIO()
1252 bufio = self.tp(rawio)
1253 bufio.__init__(rawio)
1254 bufio.__init__(rawio, buffer_size=1024)
1255 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 bufio.flush()
1258 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1259 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1260 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1261 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001262 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001264 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001266 def test_uninitialized(self):
1267 bufio = self.tp.__new__(self.tp)
1268 del bufio
1269 bufio = self.tp.__new__(self.tp)
1270 self.assertRaisesRegex((ValueError, AttributeError),
1271 'uninitialized|has no attribute',
1272 bufio.write, b'')
1273 bufio.__init__(self.MockRawIO())
1274 self.assertEqual(bufio.write(b''), 0)
1275
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001276 def test_detach_flush(self):
1277 raw = self.MockRawIO()
1278 buf = self.tp(raw)
1279 buf.write(b"howdy!")
1280 self.assertFalse(raw._write_stack)
1281 buf.detach()
1282 self.assertEqual(raw._write_stack, [b"howdy!"])
1283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001285 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001286 writer = self.MockRawIO()
1287 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001288 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001289 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001290
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001291 def test_write_overflow(self):
1292 writer = self.MockRawIO()
1293 bufio = self.tp(writer, 8)
1294 contents = b"abcdefghijklmnop"
1295 for n in range(0, len(contents), 3):
1296 bufio.write(contents[n:n+3])
1297 flushed = b"".join(writer._write_stack)
1298 # At least (total - 8) bytes were implicitly flushed, perhaps more
1299 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001300 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001301
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 def check_writes(self, intermediate_func):
1303 # Lots of writes, test the flushed output is as expected.
1304 contents = bytes(range(256)) * 1000
1305 n = 0
1306 writer = self.MockRawIO()
1307 bufio = self.tp(writer, 13)
1308 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1309 def gen_sizes():
1310 for size in count(1):
1311 for i in range(15):
1312 yield size
1313 sizes = gen_sizes()
1314 while n < len(contents):
1315 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001316 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001317 intermediate_func(bufio)
1318 n += size
1319 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001320 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001321
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 def test_writes(self):
1323 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001324
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001325 def test_writes_and_flushes(self):
1326 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 def test_writes_and_seeks(self):
1329 def _seekabs(bufio):
1330 pos = bufio.tell()
1331 bufio.seek(pos + 1, 0)
1332 bufio.seek(pos - 1, 0)
1333 bufio.seek(pos, 0)
1334 self.check_writes(_seekabs)
1335 def _seekrel(bufio):
1336 pos = bufio.seek(0, 1)
1337 bufio.seek(+1, 1)
1338 bufio.seek(-1, 1)
1339 bufio.seek(pos, 0)
1340 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001341
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001342 def test_writes_and_truncates(self):
1343 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001344
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001345 def test_write_non_blocking(self):
1346 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001347 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001348
Ezio Melottib3aedd42010-11-20 19:04:17 +00001349 self.assertEqual(bufio.write(b"abcd"), 4)
1350 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001351 # 1 byte will be written, the rest will be buffered
1352 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001353 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001354
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1356 raw.block_on(b"0")
1357 try:
1358 bufio.write(b"opqrwxyz0123456789")
1359 except self.BlockingIOError as e:
1360 written = e.characters_written
1361 else:
1362 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001363 self.assertEqual(written, 16)
1364 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001365 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001366
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001368 s = raw.pop_written()
1369 # Previously buffered bytes were flushed
1370 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001371
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 def test_write_and_rewind(self):
1373 raw = io.BytesIO()
1374 bufio = self.tp(raw, 4)
1375 self.assertEqual(bufio.write(b"abcdef"), 6)
1376 self.assertEqual(bufio.tell(), 6)
1377 bufio.seek(0, 0)
1378 self.assertEqual(bufio.write(b"XY"), 2)
1379 bufio.seek(6, 0)
1380 self.assertEqual(raw.getvalue(), b"XYcdef")
1381 self.assertEqual(bufio.write(b"123456"), 6)
1382 bufio.flush()
1383 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 def test_flush(self):
1386 writer = self.MockRawIO()
1387 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001388 bufio.write(b"abc")
1389 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001390 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001391
Antoine Pitrou131a4892012-10-16 22:57:11 +02001392 def test_writelines(self):
1393 l = [b'ab', b'cd', b'ef']
1394 writer = self.MockRawIO()
1395 bufio = self.tp(writer, 8)
1396 bufio.writelines(l)
1397 bufio.flush()
1398 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1399
1400 def test_writelines_userlist(self):
1401 l = UserList([b'ab', b'cd', b'ef'])
1402 writer = self.MockRawIO()
1403 bufio = self.tp(writer, 8)
1404 bufio.writelines(l)
1405 bufio.flush()
1406 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1407
1408 def test_writelines_error(self):
1409 writer = self.MockRawIO()
1410 bufio = self.tp(writer, 8)
1411 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1412 self.assertRaises(TypeError, bufio.writelines, None)
1413 self.assertRaises(TypeError, bufio.writelines, 'abc')
1414
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001415 def test_destructor(self):
1416 writer = self.MockRawIO()
1417 bufio = self.tp(writer, 8)
1418 bufio.write(b"abc")
1419 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001420 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422
1423 def test_truncate(self):
1424 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001425 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426 bufio = self.tp(raw, 8)
1427 bufio.write(b"abcdef")
1428 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001429 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001430 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 self.assertEqual(f.read(), b"abc")
1432
Victor Stinner45df8202010-04-28 22:31:17 +00001433 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001434 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001436 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437 # Write out many bytes from many threads and test they were
1438 # all flushed.
1439 N = 1000
1440 contents = bytes(range(256)) * N
1441 sizes = cycle([1, 19])
1442 n = 0
1443 queue = deque()
1444 while n < len(contents):
1445 size = next(sizes)
1446 queue.append(contents[n:n+size])
1447 n += size
1448 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001449 # We use a real file object because it allows us to
1450 # exercise situations where the GIL is released before
1451 # writing the buffer to the raw streams. This is in addition
1452 # to concurrency issues due to switching threads in the middle
1453 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001454 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001455 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001456 errors = []
1457 def f():
1458 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459 while True:
1460 try:
1461 s = queue.popleft()
1462 except IndexError:
1463 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001464 bufio.write(s)
1465 except Exception as e:
1466 errors.append(e)
1467 raise
1468 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001469 with support.start_threads(threads):
1470 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001471 self.assertFalse(errors,
1472 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001473 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001474 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001475 s = f.read()
1476 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001477 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001478 finally:
1479 support.unlink(support.TESTFN)
1480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001481 def test_misbehaved_io(self):
1482 rawio = self.MisbehavedRawIO()
1483 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001484 self.assertRaises(OSError, bufio.seek, 0)
1485 self.assertRaises(OSError, bufio.tell)
1486 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487
Florent Xicluna109d5732012-07-07 17:03:22 +02001488 def test_max_buffer_size_removal(self):
1489 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001490 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001491
Benjamin Peterson68623612012-12-20 11:53:11 -06001492 def test_write_error_on_close(self):
1493 raw = self.MockRawIO()
1494 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001495 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001496 raw.write = bad_write
1497 b = self.tp(raw)
1498 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001499 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001500 self.assertTrue(b.closed)
1501
Benjamin Peterson59406a92009-03-26 17:10:29 +00001502
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001503class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001504 tp = io.BufferedWriter
1505
1506 def test_constructor(self):
1507 BufferedWriterTest.test_constructor(self)
1508 # The allocation can succeed on 32-bit builds, e.g. with more
1509 # than 2GB RAM and a 64-bit kernel.
1510 if sys.maxsize > 0x7FFFFFFF:
1511 rawio = self.MockRawIO()
1512 bufio = self.tp(rawio)
1513 self.assertRaises((OverflowError, MemoryError, ValueError),
1514 bufio.__init__, rawio, sys.maxsize)
1515
1516 def test_initialization(self):
1517 rawio = self.MockRawIO()
1518 bufio = self.tp(rawio)
1519 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1520 self.assertRaises(ValueError, bufio.write, b"def")
1521 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1522 self.assertRaises(ValueError, bufio.write, b"def")
1523 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1524 self.assertRaises(ValueError, bufio.write, b"def")
1525
1526 def test_garbage_collection(self):
1527 # C BufferedWriter objects are collected, and collecting them flushes
1528 # all data to disk.
1529 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001530 with support.check_warnings(('', ResourceWarning)):
1531 rawio = self.FileIO(support.TESTFN, "w+b")
1532 f = self.tp(rawio)
1533 f.write(b"123xxx")
1534 f.x = f
1535 wr = weakref.ref(f)
1536 del f
1537 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001538 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001539 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 self.assertEqual(f.read(), b"123xxx")
1541
R David Murray67bfe802013-02-23 21:51:05 -05001542 def test_args_error(self):
1543 # Issue #17275
1544 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1545 self.tp(io.BytesIO(), 1024, 1024, 1024)
1546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547
1548class PyBufferedWriterTest(BufferedWriterTest):
1549 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001550
Guido van Rossum01a27522007-03-07 01:00:12 +00001551class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001552
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001553 def test_constructor(self):
1554 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001555 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001556
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001557 def test_uninitialized(self):
1558 pair = self.tp.__new__(self.tp)
1559 del pair
1560 pair = self.tp.__new__(self.tp)
1561 self.assertRaisesRegex((ValueError, AttributeError),
1562 'uninitialized|has no attribute',
1563 pair.read, 0)
1564 self.assertRaisesRegex((ValueError, AttributeError),
1565 'uninitialized|has no attribute',
1566 pair.write, b'')
1567 pair.__init__(self.MockRawIO(), self.MockRawIO())
1568 self.assertEqual(pair.read(0), b'')
1569 self.assertEqual(pair.write(b''), 0)
1570
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001571 def test_detach(self):
1572 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1573 self.assertRaises(self.UnsupportedOperation, pair.detach)
1574
Florent Xicluna109d5732012-07-07 17:03:22 +02001575 def test_constructor_max_buffer_size_removal(self):
1576 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001577 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001578
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001579 def test_constructor_with_not_readable(self):
1580 class NotReadable(MockRawIO):
1581 def readable(self):
1582 return False
1583
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001584 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001585
1586 def test_constructor_with_not_writeable(self):
1587 class NotWriteable(MockRawIO):
1588 def writable(self):
1589 return False
1590
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001591 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001592
1593 def test_read(self):
1594 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1595
1596 self.assertEqual(pair.read(3), b"abc")
1597 self.assertEqual(pair.read(1), b"d")
1598 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001599 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1600 self.assertEqual(pair.read(None), b"abc")
1601
1602 def test_readlines(self):
1603 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1604 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1605 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1606 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001607
1608 def test_read1(self):
1609 # .read1() is delegated to the underlying reader object, so this test
1610 # can be shallow.
1611 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1612
1613 self.assertEqual(pair.read1(3), b"abc")
1614
1615 def test_readinto(self):
1616 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1617
1618 data = bytearray(5)
1619 self.assertEqual(pair.readinto(data), 5)
1620 self.assertEqual(data, b"abcde")
1621
1622 def test_write(self):
1623 w = self.MockRawIO()
1624 pair = self.tp(self.MockRawIO(), w)
1625
1626 pair.write(b"abc")
1627 pair.flush()
1628 pair.write(b"def")
1629 pair.flush()
1630 self.assertEqual(w._write_stack, [b"abc", b"def"])
1631
1632 def test_peek(self):
1633 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1634
1635 self.assertTrue(pair.peek(3).startswith(b"abc"))
1636 self.assertEqual(pair.read(3), b"abc")
1637
1638 def test_readable(self):
1639 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1640 self.assertTrue(pair.readable())
1641
1642 def test_writeable(self):
1643 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1644 self.assertTrue(pair.writable())
1645
1646 def test_seekable(self):
1647 # BufferedRWPairs are never seekable, even if their readers and writers
1648 # are.
1649 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1650 self.assertFalse(pair.seekable())
1651
1652 # .flush() is delegated to the underlying writer object and has been
1653 # tested in the test_write method.
1654
1655 def test_close_and_closed(self):
1656 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1657 self.assertFalse(pair.closed)
1658 pair.close()
1659 self.assertTrue(pair.closed)
1660
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001661 def test_reader_close_error_on_close(self):
1662 def reader_close():
1663 reader_non_existing
1664 reader = self.MockRawIO()
1665 reader.close = reader_close
1666 writer = self.MockRawIO()
1667 pair = self.tp(reader, writer)
1668 with self.assertRaises(NameError) as err:
1669 pair.close()
1670 self.assertIn('reader_non_existing', str(err.exception))
1671 self.assertTrue(pair.closed)
1672 self.assertFalse(reader.closed)
1673 self.assertTrue(writer.closed)
1674
1675 def test_writer_close_error_on_close(self):
1676 def writer_close():
1677 writer_non_existing
1678 reader = self.MockRawIO()
1679 writer = self.MockRawIO()
1680 writer.close = writer_close
1681 pair = self.tp(reader, writer)
1682 with self.assertRaises(NameError) as err:
1683 pair.close()
1684 self.assertIn('writer_non_existing', str(err.exception))
1685 self.assertFalse(pair.closed)
1686 self.assertTrue(reader.closed)
1687 self.assertFalse(writer.closed)
1688
1689 def test_reader_writer_close_error_on_close(self):
1690 def reader_close():
1691 reader_non_existing
1692 def writer_close():
1693 writer_non_existing
1694 reader = self.MockRawIO()
1695 reader.close = reader_close
1696 writer = self.MockRawIO()
1697 writer.close = writer_close
1698 pair = self.tp(reader, writer)
1699 with self.assertRaises(NameError) as err:
1700 pair.close()
1701 self.assertIn('reader_non_existing', str(err.exception))
1702 self.assertIsInstance(err.exception.__context__, NameError)
1703 self.assertIn('writer_non_existing', str(err.exception.__context__))
1704 self.assertFalse(pair.closed)
1705 self.assertFalse(reader.closed)
1706 self.assertFalse(writer.closed)
1707
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001708 def test_isatty(self):
1709 class SelectableIsAtty(MockRawIO):
1710 def __init__(self, isatty):
1711 MockRawIO.__init__(self)
1712 self._isatty = isatty
1713
1714 def isatty(self):
1715 return self._isatty
1716
1717 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1718 self.assertFalse(pair.isatty())
1719
1720 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1721 self.assertTrue(pair.isatty())
1722
1723 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1724 self.assertTrue(pair.isatty())
1725
1726 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1727 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001728
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001729 def test_weakref_clearing(self):
1730 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1731 ref = weakref.ref(brw)
1732 brw = None
1733 ref = None # Shouldn't segfault.
1734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001735class CBufferedRWPairTest(BufferedRWPairTest):
1736 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001737
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738class PyBufferedRWPairTest(BufferedRWPairTest):
1739 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741
1742class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1743 read_mode = "rb+"
1744 write_mode = "wb+"
1745
1746 def test_constructor(self):
1747 BufferedReaderTest.test_constructor(self)
1748 BufferedWriterTest.test_constructor(self)
1749
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001750 def test_uninitialized(self):
1751 BufferedReaderTest.test_uninitialized(self)
1752 BufferedWriterTest.test_uninitialized(self)
1753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 def test_read_and_write(self):
1755 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001756 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001757
1758 self.assertEqual(b"as", rw.read(2))
1759 rw.write(b"ddd")
1760 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001761 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001763 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 def test_seek_and_tell(self):
1766 raw = self.BytesIO(b"asdfghjkl")
1767 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001768
Ezio Melottib3aedd42010-11-20 19:04:17 +00001769 self.assertEqual(b"as", rw.read(2))
1770 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001771 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001772 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001773
Antoine Pitroue05565e2011-08-20 14:39:23 +02001774 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001775 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001776 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001777 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001778 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001779 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001780 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001781 self.assertEqual(7, rw.tell())
1782 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001783 rw.flush()
1784 self.assertEqual(b"asdf123fl", raw.getvalue())
1785
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001786 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 def check_flush_and_read(self, read_func):
1789 raw = self.BytesIO(b"abcdefghi")
1790 bufio = self.tp(raw)
1791
Ezio Melottib3aedd42010-11-20 19:04:17 +00001792 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001794 self.assertEqual(b"ef", read_func(bufio, 2))
1795 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001797 self.assertEqual(6, bufio.tell())
1798 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 raw.seek(0, 0)
1800 raw.write(b"XYZ")
1801 # flush() resets the read buffer
1802 bufio.flush()
1803 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001804 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001805
1806 def test_flush_and_read(self):
1807 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1808
1809 def test_flush_and_readinto(self):
1810 def _readinto(bufio, n=-1):
1811 b = bytearray(n if n >= 0 else 9999)
1812 n = bufio.readinto(b)
1813 return bytes(b[:n])
1814 self.check_flush_and_read(_readinto)
1815
1816 def test_flush_and_peek(self):
1817 def _peek(bufio, n=-1):
1818 # This relies on the fact that the buffer can contain the whole
1819 # raw stream, otherwise peek() can return less.
1820 b = bufio.peek(n)
1821 if n != -1:
1822 b = b[:n]
1823 bufio.seek(len(b), 1)
1824 return b
1825 self.check_flush_and_read(_peek)
1826
1827 def test_flush_and_write(self):
1828 raw = self.BytesIO(b"abcdefghi")
1829 bufio = self.tp(raw)
1830
1831 bufio.write(b"123")
1832 bufio.flush()
1833 bufio.write(b"45")
1834 bufio.flush()
1835 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001836 self.assertEqual(b"12345fghi", raw.getvalue())
1837 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001838
1839 def test_threads(self):
1840 BufferedReaderTest.test_threads(self)
1841 BufferedWriterTest.test_threads(self)
1842
1843 def test_writes_and_peek(self):
1844 def _peek(bufio):
1845 bufio.peek(1)
1846 self.check_writes(_peek)
1847 def _peek(bufio):
1848 pos = bufio.tell()
1849 bufio.seek(-1, 1)
1850 bufio.peek(1)
1851 bufio.seek(pos, 0)
1852 self.check_writes(_peek)
1853
1854 def test_writes_and_reads(self):
1855 def _read(bufio):
1856 bufio.seek(-1, 1)
1857 bufio.read(1)
1858 self.check_writes(_read)
1859
1860 def test_writes_and_read1s(self):
1861 def _read1(bufio):
1862 bufio.seek(-1, 1)
1863 bufio.read1(1)
1864 self.check_writes(_read1)
1865
1866 def test_writes_and_readintos(self):
1867 def _read(bufio):
1868 bufio.seek(-1, 1)
1869 bufio.readinto(bytearray(1))
1870 self.check_writes(_read)
1871
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001872 def test_write_after_readahead(self):
1873 # Issue #6629: writing after the buffer was filled by readahead should
1874 # first rewind the raw stream.
1875 for overwrite_size in [1, 5]:
1876 raw = self.BytesIO(b"A" * 10)
1877 bufio = self.tp(raw, 4)
1878 # Trigger readahead
1879 self.assertEqual(bufio.read(1), b"A")
1880 self.assertEqual(bufio.tell(), 1)
1881 # Overwriting should rewind the raw stream if it needs so
1882 bufio.write(b"B" * overwrite_size)
1883 self.assertEqual(bufio.tell(), overwrite_size + 1)
1884 # If the write size was smaller than the buffer size, flush() and
1885 # check that rewind happens.
1886 bufio.flush()
1887 self.assertEqual(bufio.tell(), overwrite_size + 1)
1888 s = raw.getvalue()
1889 self.assertEqual(s,
1890 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1891
Antoine Pitrou7c404892011-05-13 00:13:33 +02001892 def test_write_rewind_write(self):
1893 # Various combinations of reading / writing / seeking backwards / writing again
1894 def mutate(bufio, pos1, pos2):
1895 assert pos2 >= pos1
1896 # Fill the buffer
1897 bufio.seek(pos1)
1898 bufio.read(pos2 - pos1)
1899 bufio.write(b'\x02')
1900 # This writes earlier than the previous write, but still inside
1901 # the buffer.
1902 bufio.seek(pos1)
1903 bufio.write(b'\x01')
1904
1905 b = b"\x80\x81\x82\x83\x84"
1906 for i in range(0, len(b)):
1907 for j in range(i, len(b)):
1908 raw = self.BytesIO(b)
1909 bufio = self.tp(raw, 100)
1910 mutate(bufio, i, j)
1911 bufio.flush()
1912 expected = bytearray(b)
1913 expected[j] = 2
1914 expected[i] = 1
1915 self.assertEqual(raw.getvalue(), expected,
1916 "failed result for i=%d, j=%d" % (i, j))
1917
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001918 def test_truncate_after_read_or_write(self):
1919 raw = self.BytesIO(b"A" * 10)
1920 bufio = self.tp(raw, 100)
1921 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1922 self.assertEqual(bufio.truncate(), 2)
1923 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1924 self.assertEqual(bufio.truncate(), 4)
1925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926 def test_misbehaved_io(self):
1927 BufferedReaderTest.test_misbehaved_io(self)
1928 BufferedWriterTest.test_misbehaved_io(self)
1929
Antoine Pitroue05565e2011-08-20 14:39:23 +02001930 def test_interleaved_read_write(self):
1931 # Test for issue #12213
1932 with self.BytesIO(b'abcdefgh') as raw:
1933 with self.tp(raw, 100) as f:
1934 f.write(b"1")
1935 self.assertEqual(f.read(1), b'b')
1936 f.write(b'2')
1937 self.assertEqual(f.read1(1), b'd')
1938 f.write(b'3')
1939 buf = bytearray(1)
1940 f.readinto(buf)
1941 self.assertEqual(buf, b'f')
1942 f.write(b'4')
1943 self.assertEqual(f.peek(1), b'h')
1944 f.flush()
1945 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1946
1947 with self.BytesIO(b'abc') as raw:
1948 with self.tp(raw, 100) as f:
1949 self.assertEqual(f.read(1), b'a')
1950 f.write(b"2")
1951 self.assertEqual(f.read(1), b'c')
1952 f.flush()
1953 self.assertEqual(raw.getvalue(), b'a2c')
1954
1955 def test_interleaved_readline_write(self):
1956 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1957 with self.tp(raw) as f:
1958 f.write(b'1')
1959 self.assertEqual(f.readline(), b'b\n')
1960 f.write(b'2')
1961 self.assertEqual(f.readline(), b'def\n')
1962 f.write(b'3')
1963 self.assertEqual(f.readline(), b'\n')
1964 f.flush()
1965 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1966
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001967 # You can't construct a BufferedRandom over a non-seekable stream.
1968 test_unseekable = None
1969
R David Murray67bfe802013-02-23 21:51:05 -05001970
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001971class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001972 tp = io.BufferedRandom
1973
1974 def test_constructor(self):
1975 BufferedRandomTest.test_constructor(self)
1976 # The allocation can succeed on 32-bit builds, e.g. with more
1977 # than 2GB RAM and a 64-bit kernel.
1978 if sys.maxsize > 0x7FFFFFFF:
1979 rawio = self.MockRawIO()
1980 bufio = self.tp(rawio)
1981 self.assertRaises((OverflowError, MemoryError, ValueError),
1982 bufio.__init__, rawio, sys.maxsize)
1983
1984 def test_garbage_collection(self):
1985 CBufferedReaderTest.test_garbage_collection(self)
1986 CBufferedWriterTest.test_garbage_collection(self)
1987
R David Murray67bfe802013-02-23 21:51:05 -05001988 def test_args_error(self):
1989 # Issue #17275
1990 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1991 self.tp(io.BytesIO(), 1024, 1024, 1024)
1992
1993
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994class PyBufferedRandomTest(BufferedRandomTest):
1995 tp = pyio.BufferedRandom
1996
1997
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001998# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1999# properties:
2000# - A single output character can correspond to many bytes of input.
2001# - The number of input bytes to complete the character can be
2002# undetermined until the last input byte is received.
2003# - The number of input bytes can vary depending on previous input.
2004# - A single input byte can correspond to many characters of output.
2005# - The number of output characters can be undetermined until the
2006# last input byte is received.
2007# - The number of output characters can vary depending on previous input.
2008
2009class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2010 """
2011 For testing seek/tell behavior with a stateful, buffering decoder.
2012
2013 Input is a sequence of words. Words may be fixed-length (length set
2014 by input) or variable-length (period-terminated). In variable-length
2015 mode, extra periods are ignored. Possible words are:
2016 - 'i' followed by a number sets the input length, I (maximum 99).
2017 When I is set to 0, words are space-terminated.
2018 - 'o' followed by a number sets the output length, O (maximum 99).
2019 - Any other word is converted into a word followed by a period on
2020 the output. The output word consists of the input word truncated
2021 or padded out with hyphens to make its length equal to O. If O
2022 is 0, the word is output verbatim without truncating or padding.
2023 I and O are initially set to 1. When I changes, any buffered input is
2024 re-scanned according to the new I. EOF also terminates the last word.
2025 """
2026
2027 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002028 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002029 self.reset()
2030
2031 def __repr__(self):
2032 return '<SID %x>' % id(self)
2033
2034 def reset(self):
2035 self.i = 1
2036 self.o = 1
2037 self.buffer = bytearray()
2038
2039 def getstate(self):
2040 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2041 return bytes(self.buffer), i*100 + o
2042
2043 def setstate(self, state):
2044 buffer, io = state
2045 self.buffer = bytearray(buffer)
2046 i, o = divmod(io, 100)
2047 self.i, self.o = i ^ 1, o ^ 1
2048
2049 def decode(self, input, final=False):
2050 output = ''
2051 for b in input:
2052 if self.i == 0: # variable-length, terminated with period
2053 if b == ord('.'):
2054 if self.buffer:
2055 output += self.process_word()
2056 else:
2057 self.buffer.append(b)
2058 else: # fixed-length, terminate after self.i bytes
2059 self.buffer.append(b)
2060 if len(self.buffer) == self.i:
2061 output += self.process_word()
2062 if final and self.buffer: # EOF terminates the last word
2063 output += self.process_word()
2064 return output
2065
2066 def process_word(self):
2067 output = ''
2068 if self.buffer[0] == ord('i'):
2069 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2070 elif self.buffer[0] == ord('o'):
2071 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2072 else:
2073 output = self.buffer.decode('ascii')
2074 if len(output) < self.o:
2075 output += '-'*self.o # pad out with hyphens
2076 if self.o:
2077 output = output[:self.o] # truncate to output length
2078 output += '.'
2079 self.buffer = bytearray()
2080 return output
2081
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002082 codecEnabled = False
2083
2084 @classmethod
2085 def lookupTestDecoder(cls, name):
2086 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002087 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002088 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002089 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002090 incrementalencoder=None,
2091 streamreader=None, streamwriter=None,
2092 incrementaldecoder=cls)
2093
2094# Register the previous decoder for testing.
2095# Disabled by default, tests will enable it.
2096codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2097
2098
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002099class StatefulIncrementalDecoderTest(unittest.TestCase):
2100 """
2101 Make sure the StatefulIncrementalDecoder actually works.
2102 """
2103
2104 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002105 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002106 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002107 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002108 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002109 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002110 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002111 # I=0, O=6 (variable-length input, fixed-length output)
2112 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2113 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002114 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002115 # I=6, O=3 (fixed-length input > fixed-length output)
2116 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2117 # I=0, then 3; O=29, then 15 (with longer output)
2118 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2119 'a----------------------------.' +
2120 'b----------------------------.' +
2121 'cde--------------------------.' +
2122 'abcdefghijabcde.' +
2123 'a.b------------.' +
2124 '.c.------------.' +
2125 'd.e------------.' +
2126 'k--------------.' +
2127 'l--------------.' +
2128 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002129 ]
2130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002132 # Try a few one-shot test cases.
2133 for input, eof, output in self.test_cases:
2134 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002135 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002136
2137 # Also test an unfinished decode, followed by forcing EOF.
2138 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002139 self.assertEqual(d.decode(b'oiabcd'), '')
2140 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002141
2142class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002143
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002144 def setUp(self):
2145 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2146 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002147 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002148
Guido van Rossumd0712812007-04-11 16:32:43 +00002149 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002150 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002151
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002152 def test_constructor(self):
2153 r = self.BytesIO(b"\xc3\xa9\n\n")
2154 b = self.BufferedReader(r, 1000)
2155 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002156 t.__init__(b, encoding="latin-1", newline="\r\n")
2157 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002159 t.__init__(b, encoding="utf-8", line_buffering=True)
2160 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(t.line_buffering, True)
2162 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 self.assertRaises(TypeError, t.__init__, b, newline=42)
2164 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2165
Nick Coghlana9b15242014-02-04 22:11:18 +10002166 def test_non_text_encoding_codecs_are_rejected(self):
2167 # Ensure the constructor complains if passed a codec that isn't
2168 # marked as a text encoding
2169 # http://bugs.python.org/issue20404
2170 r = self.BytesIO()
2171 b = self.BufferedWriter(r)
2172 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2173 self.TextIOWrapper(b, encoding="hex")
2174
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002175 def test_detach(self):
2176 r = self.BytesIO()
2177 b = self.BufferedWriter(r)
2178 t = self.TextIOWrapper(b)
2179 self.assertIs(t.detach(), b)
2180
2181 t = self.TextIOWrapper(b, encoding="ascii")
2182 t.write("howdy")
2183 self.assertFalse(r.getvalue())
2184 t.detach()
2185 self.assertEqual(r.getvalue(), b"howdy")
2186 self.assertRaises(ValueError, t.detach)
2187
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002188 # Operations independent of the detached stream should still work
2189 repr(t)
2190 self.assertEqual(t.encoding, "ascii")
2191 self.assertEqual(t.errors, "strict")
2192 self.assertFalse(t.line_buffering)
2193
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002194 def test_repr(self):
2195 raw = self.BytesIO("hello".encode("utf-8"))
2196 b = self.BufferedReader(raw)
2197 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002198 modname = self.TextIOWrapper.__module__
2199 self.assertEqual(repr(t),
2200 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2201 raw.name = "dummy"
2202 self.assertEqual(repr(t),
2203 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002204 t.mode = "r"
2205 self.assertEqual(repr(t),
2206 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002207 raw.name = b"dummy"
2208 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002209 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002210
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002211 t.buffer.detach()
2212 repr(t) # Should not raise an exception
2213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002214 def test_line_buffering(self):
2215 r = self.BytesIO()
2216 b = self.BufferedWriter(r, 1000)
2217 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002218 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002219 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002220 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002222 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002224
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002225 def test_default_encoding(self):
2226 old_environ = dict(os.environ)
2227 try:
2228 # try to get a user preferred encoding different than the current
2229 # locale encoding to check that TextIOWrapper() uses the current
2230 # locale encoding and not the user preferred encoding
2231 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2232 if key in os.environ:
2233 del os.environ[key]
2234
2235 current_locale_encoding = locale.getpreferredencoding(False)
2236 b = self.BytesIO()
2237 t = self.TextIOWrapper(b)
2238 self.assertEqual(t.encoding, current_locale_encoding)
2239 finally:
2240 os.environ.clear()
2241 os.environ.update(old_environ)
2242
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002243 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002244 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002245 # Issue 15989
2246 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002247 b = self.BytesIO()
2248 b.fileno = lambda: _testcapi.INT_MAX + 1
2249 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2250 b.fileno = lambda: _testcapi.UINT_MAX + 1
2251 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 def test_encoding(self):
2254 # Check the encoding attribute is always set, and valid
2255 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002256 t = self.TextIOWrapper(b, encoding="utf-8")
2257 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002258 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002259 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 codecs.lookup(t.encoding)
2261
2262 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002263 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002264 b = self.BytesIO(b"abc\n\xff\n")
2265 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002266 self.assertRaises(UnicodeError, t.read)
2267 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 b = self.BytesIO(b"abc\n\xff\n")
2269 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002270 self.assertRaises(UnicodeError, t.read)
2271 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 b = self.BytesIO(b"abc\n\xff\n")
2273 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002274 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002275 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002276 b = self.BytesIO(b"abc\n\xff\n")
2277 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002278 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002279
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002281 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 b = self.BytesIO()
2283 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002284 self.assertRaises(UnicodeError, t.write, "\xff")
2285 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 b = self.BytesIO()
2287 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002288 self.assertRaises(UnicodeError, t.write, "\xff")
2289 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 b = self.BytesIO()
2291 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002292 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002293 t.write("abc\xffdef\n")
2294 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002295 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002296 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 b = self.BytesIO()
2298 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002299 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002300 t.write("abc\xffdef\n")
2301 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002302 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002303
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002304 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002305 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2306
2307 tests = [
2308 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002309 [ '', input_lines ],
2310 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2311 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2312 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002313 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002314 encodings = (
2315 'utf-8', 'latin-1',
2316 'utf-16', 'utf-16-le', 'utf-16-be',
2317 'utf-32', 'utf-32-le', 'utf-32-be',
2318 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002319
Guido van Rossum8358db22007-08-18 21:39:55 +00002320 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002321 # character in TextIOWrapper._pending_line.
2322 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002323 # XXX: str.encode() should return bytes
2324 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002325 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002326 for bufsize in range(1, 10):
2327 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2329 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002330 encoding=encoding)
2331 if do_reads:
2332 got_lines = []
2333 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002334 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002335 if c2 == '':
2336 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002338 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002339 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002340 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002341
2342 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002343 self.assertEqual(got_line, exp_line)
2344 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002346 def test_newlines_input(self):
2347 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002348 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2349 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002350 (None, normalized.decode("ascii").splitlines(keepends=True)),
2351 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002352 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2353 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2354 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002355 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 buf = self.BytesIO(testdata)
2357 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002358 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002359 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002360 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002361
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002362 def test_newlines_output(self):
2363 testdict = {
2364 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2365 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2366 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2367 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2368 }
2369 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2370 for newline, expected in tests:
2371 buf = self.BytesIO()
2372 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2373 txt.write("AAA\nB")
2374 txt.write("BB\nCCC\n")
2375 txt.write("X\rY\r\nZ")
2376 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002377 self.assertEqual(buf.closed, False)
2378 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002379
2380 def test_destructor(self):
2381 l = []
2382 base = self.BytesIO
2383 class MyBytesIO(base):
2384 def close(self):
2385 l.append(self.getvalue())
2386 base.close(self)
2387 b = MyBytesIO()
2388 t = self.TextIOWrapper(b, encoding="ascii")
2389 t.write("abc")
2390 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002391 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002393
2394 def test_override_destructor(self):
2395 record = []
2396 class MyTextIO(self.TextIOWrapper):
2397 def __del__(self):
2398 record.append(1)
2399 try:
2400 f = super().__del__
2401 except AttributeError:
2402 pass
2403 else:
2404 f()
2405 def close(self):
2406 record.append(2)
2407 super().close()
2408 def flush(self):
2409 record.append(3)
2410 super().flush()
2411 b = self.BytesIO()
2412 t = MyTextIO(b, encoding="ascii")
2413 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002414 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 self.assertEqual(record, [1, 2, 3])
2416
2417 def test_error_through_destructor(self):
2418 # Test that the exception state is not modified by a destructor,
2419 # even if close() fails.
2420 rawio = self.CloseFailureIO()
2421 def f():
2422 self.TextIOWrapper(rawio).xyzzy
2423 with support.captured_output("stderr") as s:
2424 self.assertRaises(AttributeError, f)
2425 s = s.getvalue().strip()
2426 if s:
2427 # The destructor *may* have printed an unraisable error, check it
2428 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002429 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002430 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002431
Guido van Rossum9b76da62007-04-11 01:09:03 +00002432 # Systematic tests of the text I/O API
2433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002435 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002436 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002437 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002438 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002440 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002441 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002442 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(f.tell(), 0)
2444 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002445 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002446 self.assertEqual(f.seek(0), 0)
2447 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002448 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002449 self.assertEqual(f.read(2), "ab")
2450 self.assertEqual(f.read(1), "c")
2451 self.assertEqual(f.read(1), "")
2452 self.assertEqual(f.read(), "")
2453 self.assertEqual(f.tell(), cookie)
2454 self.assertEqual(f.seek(0), 0)
2455 self.assertEqual(f.seek(0, 2), cookie)
2456 self.assertEqual(f.write("def"), 3)
2457 self.assertEqual(f.seek(cookie), cookie)
2458 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002459 if enc.startswith("utf"):
2460 self.multi_line_test(f, enc)
2461 f.close()
2462
2463 def multi_line_test(self, f, enc):
2464 f.seek(0)
2465 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002466 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002467 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002468 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002469 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002470 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002471 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002472 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002473 wlines.append((f.tell(), line))
2474 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002475 f.seek(0)
2476 rlines = []
2477 while True:
2478 pos = f.tell()
2479 line = f.readline()
2480 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002481 break
2482 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002483 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002484
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002485 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002486 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002487 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002488 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002489 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002490 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002491 p2 = f.tell()
2492 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002493 self.assertEqual(f.tell(), p0)
2494 self.assertEqual(f.readline(), "\xff\n")
2495 self.assertEqual(f.tell(), p1)
2496 self.assertEqual(f.readline(), "\xff\n")
2497 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002498 f.seek(0)
2499 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002500 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002501 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002503 f.close()
2504
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002505 def test_seeking(self):
2506 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002507 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002508 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002509 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002510 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002511 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002512 suffix = bytes(u_suffix.encode("utf-8"))
2513 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002514 with self.open(support.TESTFN, "wb") as f:
2515 f.write(line*2)
2516 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2517 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(s, str(prefix, "ascii"))
2519 self.assertEqual(f.tell(), prefix_size)
2520 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002523 # Regression test for a specific bug
2524 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002525 with self.open(support.TESTFN, "wb") as f:
2526 f.write(data)
2527 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2528 f._CHUNK_SIZE # Just test that it exists
2529 f._CHUNK_SIZE = 2
2530 f.readline()
2531 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 def test_seek_and_tell(self):
2534 #Test seek/tell using the StatefulIncrementalDecoder.
2535 # Make test faster by doing smaller seeks
2536 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002537
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002538 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002539 """Tell/seek to various points within a data stream and ensure
2540 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002542 f.write(data)
2543 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002544 f = self.open(support.TESTFN, encoding='test_decoder')
2545 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002546 decoded = f.read()
2547 f.close()
2548
Neal Norwitze2b07052008-03-18 19:52:05 +00002549 for i in range(min_pos, len(decoded) + 1): # seek positions
2550 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002551 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002552 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002553 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002554 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002555 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002556 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002557 f.close()
2558
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002559 # Enable the test decoder.
2560 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002561
2562 # Run the tests.
2563 try:
2564 # Try each test case.
2565 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002566 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002567
2568 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002569 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2570 offset = CHUNK_SIZE - len(input)//2
2571 prefix = b'.'*offset
2572 # Don't bother seeking into the prefix (takes too long).
2573 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002574 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002575
2576 # Ensure our test decoder won't interfere with subsequent tests.
2577 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002578 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002581 data = "1234567890"
2582 tests = ("utf-16",
2583 "utf-16-le",
2584 "utf-16-be",
2585 "utf-32",
2586 "utf-32-le",
2587 "utf-32-be")
2588 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589 buf = self.BytesIO()
2590 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002591 # Check if the BOM is written only once (see issue1753).
2592 f.write(data)
2593 f.write(data)
2594 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002595 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002596 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002597 self.assertEqual(f.read(), data * 2)
2598 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002599
Benjamin Petersona1b49012009-03-31 23:11:32 +00002600 def test_unreadable(self):
2601 class UnReadable(self.BytesIO):
2602 def readable(self):
2603 return False
2604 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002605 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002607 def test_read_one_by_one(self):
2608 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002609 reads = ""
2610 while True:
2611 c = txt.read(1)
2612 if not c:
2613 break
2614 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002615 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002616
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002617 def test_readlines(self):
2618 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2619 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2620 txt.seek(0)
2621 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2622 txt.seek(0)
2623 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2624
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002625 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002627 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002629 reads = ""
2630 while True:
2631 c = txt.read(128)
2632 if not c:
2633 break
2634 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002635 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002636
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002637 def test_writelines(self):
2638 l = ['ab', 'cd', 'ef']
2639 buf = self.BytesIO()
2640 txt = self.TextIOWrapper(buf)
2641 txt.writelines(l)
2642 txt.flush()
2643 self.assertEqual(buf.getvalue(), b'abcdef')
2644
2645 def test_writelines_userlist(self):
2646 l = UserList(['ab', 'cd', 'ef'])
2647 buf = self.BytesIO()
2648 txt = self.TextIOWrapper(buf)
2649 txt.writelines(l)
2650 txt.flush()
2651 self.assertEqual(buf.getvalue(), b'abcdef')
2652
2653 def test_writelines_error(self):
2654 txt = self.TextIOWrapper(self.BytesIO())
2655 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2656 self.assertRaises(TypeError, txt.writelines, None)
2657 self.assertRaises(TypeError, txt.writelines, b'abc')
2658
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002659 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002660 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002661
2662 # read one char at a time
2663 reads = ""
2664 while True:
2665 c = txt.read(1)
2666 if not c:
2667 break
2668 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002669 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002670
2671 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002672 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002673 txt._CHUNK_SIZE = 4
2674
2675 reads = ""
2676 while True:
2677 c = txt.read(4)
2678 if not c:
2679 break
2680 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002681 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002682
2683 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002684 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002685 txt._CHUNK_SIZE = 4
2686
2687 reads = txt.read(4)
2688 reads += txt.read(4)
2689 reads += txt.readline()
2690 reads += txt.readline()
2691 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002692 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002693
2694 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002695 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002696 txt._CHUNK_SIZE = 4
2697
2698 reads = txt.read(4)
2699 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002700 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002701
2702 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002703 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002704 txt._CHUNK_SIZE = 4
2705
2706 reads = txt.read(4)
2707 pos = txt.tell()
2708 txt.seek(0)
2709 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002710 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002711
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002712 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002713 buffer = self.BytesIO(self.testdata)
2714 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002715
2716 self.assertEqual(buffer.seekable(), txt.seekable())
2717
Antoine Pitroue4501852009-05-14 18:55:55 +00002718 def test_append_bom(self):
2719 # The BOM is not written again when appending to a non-empty file
2720 filename = support.TESTFN
2721 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2722 with self.open(filename, 'w', encoding=charset) as f:
2723 f.write('aaa')
2724 pos = f.tell()
2725 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002726 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002727
2728 with self.open(filename, 'a', encoding=charset) as f:
2729 f.write('xxx')
2730 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002732
2733 def test_seek_bom(self):
2734 # Same test, but when seeking manually
2735 filename = support.TESTFN
2736 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2737 with self.open(filename, 'w', encoding=charset) as f:
2738 f.write('aaa')
2739 pos = f.tell()
2740 with self.open(filename, 'r+', encoding=charset) as f:
2741 f.seek(pos)
2742 f.write('zzz')
2743 f.seek(0)
2744 f.write('bbb')
2745 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002746 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002747
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002748 def test_seek_append_bom(self):
2749 # Same test, but first seek to the start and then to the end
2750 filename = support.TESTFN
2751 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2752 with self.open(filename, 'w', encoding=charset) as f:
2753 f.write('aaa')
2754 with self.open(filename, 'a', encoding=charset) as f:
2755 f.seek(0)
2756 f.seek(0, self.SEEK_END)
2757 f.write('xxx')
2758 with self.open(filename, 'rb') as f:
2759 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2760
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002761 def test_errors_property(self):
2762 with self.open(support.TESTFN, "w") as f:
2763 self.assertEqual(f.errors, "strict")
2764 with self.open(support.TESTFN, "w", errors="replace") as f:
2765 self.assertEqual(f.errors, "replace")
2766
Brett Cannon31f59292011-02-21 19:29:56 +00002767 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002768 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002769 def test_threads_write(self):
2770 # Issue6750: concurrent writes could duplicate data
2771 event = threading.Event()
2772 with self.open(support.TESTFN, "w", buffering=1) as f:
2773 def run(n):
2774 text = "Thread%03d\n" % n
2775 event.wait()
2776 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002777 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002778 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002779 with support.start_threads(threads, event.set):
2780 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002781 with self.open(support.TESTFN) as f:
2782 content = f.read()
2783 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002784 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002785
Antoine Pitrou6be88762010-05-03 16:48:20 +00002786 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002787 # Test that text file is closed despite failed flush
2788 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002789 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002790 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002791 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002792 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002793 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002794 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002795 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002796 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002797 self.assertTrue(txt.buffer.closed)
2798 self.assertTrue(closed) # flush() called
2799 self.assertFalse(closed[0]) # flush() called before file closed
2800 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002801 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002802
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002803 def test_close_error_on_close(self):
2804 buffer = self.BytesIO(self.testdata)
2805 def bad_flush():
2806 raise OSError('flush')
2807 def bad_close():
2808 raise OSError('close')
2809 buffer.close = bad_close
2810 txt = self.TextIOWrapper(buffer, encoding="ascii")
2811 txt.flush = bad_flush
2812 with self.assertRaises(OSError) as err: # exception not swallowed
2813 txt.close()
2814 self.assertEqual(err.exception.args, ('close',))
2815 self.assertIsInstance(err.exception.__context__, OSError)
2816 self.assertEqual(err.exception.__context__.args, ('flush',))
2817 self.assertFalse(txt.closed)
2818
2819 def test_nonnormalized_close_error_on_close(self):
2820 # Issue #21677
2821 buffer = self.BytesIO(self.testdata)
2822 def bad_flush():
2823 raise non_existing_flush
2824 def bad_close():
2825 raise non_existing_close
2826 buffer.close = bad_close
2827 txt = self.TextIOWrapper(buffer, encoding="ascii")
2828 txt.flush = bad_flush
2829 with self.assertRaises(NameError) as err: # exception not swallowed
2830 txt.close()
2831 self.assertIn('non_existing_close', str(err.exception))
2832 self.assertIsInstance(err.exception.__context__, NameError)
2833 self.assertIn('non_existing_flush', str(err.exception.__context__))
2834 self.assertFalse(txt.closed)
2835
Antoine Pitrou6be88762010-05-03 16:48:20 +00002836 def test_multi_close(self):
2837 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2838 txt.close()
2839 txt.close()
2840 txt.close()
2841 self.assertRaises(ValueError, txt.flush)
2842
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002843 def test_unseekable(self):
2844 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2845 self.assertRaises(self.UnsupportedOperation, txt.tell)
2846 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2847
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002848 def test_readonly_attributes(self):
2849 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2850 buf = self.BytesIO(self.testdata)
2851 with self.assertRaises(AttributeError):
2852 txt.buffer = buf
2853
Antoine Pitroue96ec682011-07-23 21:46:35 +02002854 def test_rawio(self):
2855 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2856 # that subprocess.Popen() can have the required unbuffered
2857 # semantics with universal_newlines=True.
2858 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2859 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2860 # Reads
2861 self.assertEqual(txt.read(4), 'abcd')
2862 self.assertEqual(txt.readline(), 'efghi\n')
2863 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2864
2865 def test_rawio_write_through(self):
2866 # Issue #12591: with write_through=True, writes don't need a flush
2867 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2868 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2869 write_through=True)
2870 txt.write('1')
2871 txt.write('23\n4')
2872 txt.write('5')
2873 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2874
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002875 def test_bufio_write_through(self):
2876 # Issue #21396: write_through=True doesn't force a flush()
2877 # on the underlying binary buffered object.
2878 flush_called, write_called = [], []
2879 class BufferedWriter(self.BufferedWriter):
2880 def flush(self, *args, **kwargs):
2881 flush_called.append(True)
2882 return super().flush(*args, **kwargs)
2883 def write(self, *args, **kwargs):
2884 write_called.append(True)
2885 return super().write(*args, **kwargs)
2886
2887 rawio = self.BytesIO()
2888 data = b"a"
2889 bufio = BufferedWriter(rawio, len(data)*2)
2890 textio = self.TextIOWrapper(bufio, encoding='ascii',
2891 write_through=True)
2892 # write to the buffered io but don't overflow the buffer
2893 text = data.decode('ascii')
2894 textio.write(text)
2895
2896 # buffer.flush is not called with write_through=True
2897 self.assertFalse(flush_called)
2898 # buffer.write *is* called with write_through=True
2899 self.assertTrue(write_called)
2900 self.assertEqual(rawio.getvalue(), b"") # no flush
2901
2902 write_called = [] # reset
2903 textio.write(text * 10) # total content is larger than bufio buffer
2904 self.assertTrue(write_called)
2905 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2906
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002907 def test_read_nonbytes(self):
2908 # Issue #17106
2909 # Crash when underlying read() returns non-bytes
2910 t = self.TextIOWrapper(self.StringIO('a'))
2911 self.assertRaises(TypeError, t.read, 1)
2912 t = self.TextIOWrapper(self.StringIO('a'))
2913 self.assertRaises(TypeError, t.readline)
2914 t = self.TextIOWrapper(self.StringIO('a'))
2915 self.assertRaises(TypeError, t.read)
2916
2917 def test_illegal_decoder(self):
2918 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002919 # Bypass the early encoding check added in issue 20404
2920 def _make_illegal_wrapper():
2921 quopri = codecs.lookup("quopri")
2922 quopri._is_text_encoding = True
2923 try:
2924 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2925 newline='\n', encoding="quopri")
2926 finally:
2927 quopri._is_text_encoding = False
2928 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002929 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002930 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002931 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002932 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002933 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002934 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002935 self.assertRaises(TypeError, t.read)
2936
Antoine Pitrou712cb732013-12-21 15:51:54 +01002937 def _check_create_at_shutdown(self, **kwargs):
2938 # Issue #20037: creating a TextIOWrapper at shutdown
2939 # shouldn't crash the interpreter.
2940 iomod = self.io.__name__
2941 code = """if 1:
2942 import codecs
2943 import {iomod} as io
2944
2945 # Avoid looking up codecs at shutdown
2946 codecs.lookup('utf-8')
2947
2948 class C:
2949 def __init__(self):
2950 self.buf = io.BytesIO()
2951 def __del__(self):
2952 io.TextIOWrapper(self.buf, **{kwargs})
2953 print("ok")
2954 c = C()
2955 """.format(iomod=iomod, kwargs=kwargs)
2956 return assert_python_ok("-c", code)
2957
2958 def test_create_at_shutdown_without_encoding(self):
2959 rc, out, err = self._check_create_at_shutdown()
2960 if err:
2961 # Can error out with a RuntimeError if the module state
2962 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002963 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002964 else:
2965 self.assertEqual("ok", out.decode().strip())
2966
2967 def test_create_at_shutdown_with_encoding(self):
2968 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2969 errors='strict')
2970 self.assertFalse(err)
2971 self.assertEqual("ok", out.decode().strip())
2972
Antoine Pitroub8503892014-04-29 10:14:02 +02002973 def test_read_byteslike(self):
2974 r = MemviewBytesIO(b'Just some random string\n')
2975 t = self.TextIOWrapper(r, 'utf-8')
2976
2977 # TextIOwrapper will not read the full string, because
2978 # we truncate it to a multiple of the native int size
2979 # so that we can construct a more complex memoryview.
2980 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2981
2982 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2983
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002984 def test_issue22849(self):
2985 class F(object):
2986 def readable(self): return True
2987 def writable(self): return True
2988 def seekable(self): return True
2989
2990 for i in range(10):
2991 try:
2992 self.TextIOWrapper(F(), encoding='utf-8')
2993 except Exception:
2994 pass
2995
2996 F.tell = lambda x: 0
2997 t = self.TextIOWrapper(F(), encoding='utf-8')
2998
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002999
Antoine Pitroub8503892014-04-29 10:14:02 +02003000class MemviewBytesIO(io.BytesIO):
3001 '''A BytesIO object whose read method returns memoryviews
3002 rather than bytes'''
3003
3004 def read1(self, len_):
3005 return _to_memoryview(super().read1(len_))
3006
3007 def read(self, len_):
3008 return _to_memoryview(super().read(len_))
3009
3010def _to_memoryview(buf):
3011 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3012
3013 arr = array.array('i')
3014 idx = len(buf) - len(buf) % arr.itemsize
3015 arr.frombytes(buf[:idx])
3016 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003017
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003019class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003020 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003021 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003022
3023 def test_initialization(self):
3024 r = self.BytesIO(b"\xc3\xa9\n\n")
3025 b = self.BufferedReader(r, 1000)
3026 t = self.TextIOWrapper(b)
3027 self.assertRaises(TypeError, t.__init__, b, newline=42)
3028 self.assertRaises(ValueError, t.read)
3029 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3030 self.assertRaises(ValueError, t.read)
3031
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003032 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3033 self.assertRaises(Exception, repr, t)
3034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003035 def test_garbage_collection(self):
3036 # C TextIOWrapper objects are collected, and collecting them flushes
3037 # all data to disk.
3038 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003039 with support.check_warnings(('', ResourceWarning)):
3040 rawio = io.FileIO(support.TESTFN, "wb")
3041 b = self.BufferedWriter(rawio)
3042 t = self.TextIOWrapper(b, encoding="ascii")
3043 t.write("456def")
3044 t.x = t
3045 wr = weakref.ref(t)
3046 del t
3047 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003048 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003049 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003050 self.assertEqual(f.read(), b"456def")
3051
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003052 def test_rwpair_cleared_before_textio(self):
3053 # Issue 13070: TextIOWrapper's finalization would crash when called
3054 # after the reference to the underlying BufferedRWPair's writer got
3055 # cleared by the GC.
3056 for i in range(1000):
3057 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3058 t1 = self.TextIOWrapper(b1, encoding="ascii")
3059 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3060 t2 = self.TextIOWrapper(b2, encoding="ascii")
3061 # circular references
3062 t1.buddy = t2
3063 t2.buddy = t1
3064 support.gc_collect()
3065
3066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003067class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003068 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003069 #shutdown_error = "LookupError: unknown encoding: ascii"
3070 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003071
3072
3073class IncrementalNewlineDecoderTest(unittest.TestCase):
3074
3075 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003076 # UTF-8 specific tests for a newline decoder
3077 def _check_decode(b, s, **kwargs):
3078 # We exercise getstate() / setstate() as well as decode()
3079 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003080 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003081 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003082 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003083
Antoine Pitrou180a3362008-12-14 16:36:46 +00003084 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003085
Antoine Pitrou180a3362008-12-14 16:36:46 +00003086 _check_decode(b'\xe8', "")
3087 _check_decode(b'\xa2', "")
3088 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003089
Antoine Pitrou180a3362008-12-14 16:36:46 +00003090 _check_decode(b'\xe8', "")
3091 _check_decode(b'\xa2', "")
3092 _check_decode(b'\x88', "\u8888")
3093
3094 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3096
Antoine Pitrou180a3362008-12-14 16:36:46 +00003097 decoder.reset()
3098 _check_decode(b'\n', "\n")
3099 _check_decode(b'\r', "")
3100 _check_decode(b'', "\n", final=True)
3101 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003102
Antoine Pitrou180a3362008-12-14 16:36:46 +00003103 _check_decode(b'\r', "")
3104 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003105
Antoine Pitrou180a3362008-12-14 16:36:46 +00003106 _check_decode(b'\r\r\n', "\n\n")
3107 _check_decode(b'\r', "")
3108 _check_decode(b'\r', "\n")
3109 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003110
Antoine Pitrou180a3362008-12-14 16:36:46 +00003111 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3112 _check_decode(b'\xe8\xa2\x88', "\u8888")
3113 _check_decode(b'\n', "\n")
3114 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3115 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003117 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003118 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003119 if encoding is not None:
3120 encoder = codecs.getincrementalencoder(encoding)()
3121 def _decode_bytewise(s):
3122 # Decode one byte at a time
3123 for b in encoder.encode(s):
3124 result.append(decoder.decode(bytes([b])))
3125 else:
3126 encoder = None
3127 def _decode_bytewise(s):
3128 # Decode one char at a time
3129 for c in s:
3130 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003131 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003132 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003133 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003134 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003135 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003136 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003137 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003138 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003139 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003140 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003141 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003142 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003143 input = "abc"
3144 if encoder is not None:
3145 encoder.reset()
3146 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003147 self.assertEqual(decoder.decode(input), "abc")
3148 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003149
3150 def test_newline_decoder(self):
3151 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003152 # None meaning the IncrementalNewlineDecoder takes unicode input
3153 # rather than bytes input
3154 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003155 'utf-16', 'utf-16-le', 'utf-16-be',
3156 'utf-32', 'utf-32-le', 'utf-32-be',
3157 )
3158 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003159 decoder = enc and codecs.getincrementaldecoder(enc)()
3160 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3161 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003162 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003163 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3164 self.check_newline_decoding_utf8(decoder)
3165
Antoine Pitrou66913e22009-03-06 23:40:56 +00003166 def test_newline_bytes(self):
3167 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3168 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003169 self.assertEqual(dec.newlines, None)
3170 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3171 self.assertEqual(dec.newlines, None)
3172 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3173 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003174 dec = self.IncrementalNewlineDecoder(None, translate=False)
3175 _check(dec)
3176 dec = self.IncrementalNewlineDecoder(None, translate=True)
3177 _check(dec)
3178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003179class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3180 pass
3181
3182class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3183 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003184
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003185
Guido van Rossum01a27522007-03-07 01:00:12 +00003186# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003187
Guido van Rossum5abbf752007-08-27 17:39:33 +00003188class MiscIOTest(unittest.TestCase):
3189
Barry Warsaw40e82462008-11-20 20:14:50 +00003190 def tearDown(self):
3191 support.unlink(support.TESTFN)
3192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003193 def test___all__(self):
3194 for name in self.io.__all__:
3195 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003196 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003197 if name == "open":
3198 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003199 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003200 self.assertTrue(issubclass(obj, Exception), name)
3201 elif not name.startswith("SEEK_"):
3202 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003203
Barry Warsaw40e82462008-11-20 20:14:50 +00003204 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003205 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003206 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003207 f.close()
3208
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003209 with support.check_warnings(('', DeprecationWarning)):
3210 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003211 self.assertEqual(f.name, support.TESTFN)
3212 self.assertEqual(f.buffer.name, support.TESTFN)
3213 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3214 self.assertEqual(f.mode, "U")
3215 self.assertEqual(f.buffer.mode, "rb")
3216 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003217 f.close()
3218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003219 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003220 self.assertEqual(f.mode, "w+")
3221 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3222 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003224 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003225 self.assertEqual(g.mode, "wb")
3226 self.assertEqual(g.raw.mode, "wb")
3227 self.assertEqual(g.name, f.fileno())
3228 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003229 f.close()
3230 g.close()
3231
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003232 def test_io_after_close(self):
3233 for kwargs in [
3234 {"mode": "w"},
3235 {"mode": "wb"},
3236 {"mode": "w", "buffering": 1},
3237 {"mode": "w", "buffering": 2},
3238 {"mode": "wb", "buffering": 0},
3239 {"mode": "r"},
3240 {"mode": "rb"},
3241 {"mode": "r", "buffering": 1},
3242 {"mode": "r", "buffering": 2},
3243 {"mode": "rb", "buffering": 0},
3244 {"mode": "w+"},
3245 {"mode": "w+b"},
3246 {"mode": "w+", "buffering": 1},
3247 {"mode": "w+", "buffering": 2},
3248 {"mode": "w+b", "buffering": 0},
3249 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003250 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003251 f.close()
3252 self.assertRaises(ValueError, f.flush)
3253 self.assertRaises(ValueError, f.fileno)
3254 self.assertRaises(ValueError, f.isatty)
3255 self.assertRaises(ValueError, f.__iter__)
3256 if hasattr(f, "peek"):
3257 self.assertRaises(ValueError, f.peek, 1)
3258 self.assertRaises(ValueError, f.read)
3259 if hasattr(f, "read1"):
3260 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003261 if hasattr(f, "readall"):
3262 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003263 if hasattr(f, "readinto"):
3264 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003265 if hasattr(f, "readinto1"):
3266 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003267 self.assertRaises(ValueError, f.readline)
3268 self.assertRaises(ValueError, f.readlines)
3269 self.assertRaises(ValueError, f.seek, 0)
3270 self.assertRaises(ValueError, f.tell)
3271 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003272 self.assertRaises(ValueError, f.write,
3273 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003274 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003275 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003277 def test_blockingioerror(self):
3278 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003279 class C(str):
3280 pass
3281 c = C("")
3282 b = self.BlockingIOError(1, c)
3283 c.b = b
3284 b.c = c
3285 wr = weakref.ref(c)
3286 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003287 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003288 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003289
3290 def test_abcs(self):
3291 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003292 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3293 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3294 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3295 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003296
3297 def _check_abc_inheritance(self, abcmodule):
3298 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003299 self.assertIsInstance(f, abcmodule.IOBase)
3300 self.assertIsInstance(f, abcmodule.RawIOBase)
3301 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3302 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003303 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003304 self.assertIsInstance(f, abcmodule.IOBase)
3305 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3306 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3307 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003308 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003309 self.assertIsInstance(f, abcmodule.IOBase)
3310 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3311 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3312 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003313
3314 def test_abc_inheritance(self):
3315 # Test implementations inherit from their respective ABCs
3316 self._check_abc_inheritance(self)
3317
3318 def test_abc_inheritance_official(self):
3319 # Test implementations inherit from the official ABCs of the
3320 # baseline "io" module.
3321 self._check_abc_inheritance(io)
3322
Antoine Pitroue033e062010-10-29 10:38:18 +00003323 def _check_warn_on_dealloc(self, *args, **kwargs):
3324 f = open(*args, **kwargs)
3325 r = repr(f)
3326 with self.assertWarns(ResourceWarning) as cm:
3327 f = None
3328 support.gc_collect()
3329 self.assertIn(r, str(cm.warning.args[0]))
3330
3331 def test_warn_on_dealloc(self):
3332 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3333 self._check_warn_on_dealloc(support.TESTFN, "wb")
3334 self._check_warn_on_dealloc(support.TESTFN, "w")
3335
3336 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3337 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003338 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003339 for fd in fds:
3340 try:
3341 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003342 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003343 if e.errno != errno.EBADF:
3344 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003345 self.addCleanup(cleanup_fds)
3346 r, w = os.pipe()
3347 fds += r, w
3348 self._check_warn_on_dealloc(r, *args, **kwargs)
3349 # When using closefd=False, there's no warning
3350 r, w = os.pipe()
3351 fds += r, w
3352 with warnings.catch_warnings(record=True) as recorded:
3353 open(r, *args, closefd=False, **kwargs)
3354 support.gc_collect()
3355 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003356
3357 def test_warn_on_dealloc_fd(self):
3358 self._check_warn_on_dealloc_fd("rb", buffering=0)
3359 self._check_warn_on_dealloc_fd("rb")
3360 self._check_warn_on_dealloc_fd("r")
3361
3362
Antoine Pitrou243757e2010-11-05 21:15:39 +00003363 def test_pickling(self):
3364 # Pickling file objects is forbidden
3365 for kwargs in [
3366 {"mode": "w"},
3367 {"mode": "wb"},
3368 {"mode": "wb", "buffering": 0},
3369 {"mode": "r"},
3370 {"mode": "rb"},
3371 {"mode": "rb", "buffering": 0},
3372 {"mode": "w+"},
3373 {"mode": "w+b"},
3374 {"mode": "w+b", "buffering": 0},
3375 ]:
3376 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3377 with self.open(support.TESTFN, **kwargs) as f:
3378 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3379
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003380 def test_nonblock_pipe_write_bigbuf(self):
3381 self._test_nonblock_pipe_write(16*1024)
3382
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003383 def test_nonblock_pipe_write_smallbuf(self):
3384 self._test_nonblock_pipe_write(1024)
3385
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003386 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3387 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003388 def _test_nonblock_pipe_write(self, bufsize):
3389 sent = []
3390 received = []
3391 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003392 os.set_blocking(r, False)
3393 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003394
3395 # To exercise all code paths in the C implementation we need
3396 # to play with buffer sizes. For instance, if we choose a
3397 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3398 # then we will never get a partial write of the buffer.
3399 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3400 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3401
3402 with rf, wf:
3403 for N in 9999, 73, 7574:
3404 try:
3405 i = 0
3406 while True:
3407 msg = bytes([i % 26 + 97]) * N
3408 sent.append(msg)
3409 wf.write(msg)
3410 i += 1
3411
3412 except self.BlockingIOError as e:
3413 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003414 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003415 sent[-1] = sent[-1][:e.characters_written]
3416 received.append(rf.read())
3417 msg = b'BLOCKED'
3418 wf.write(msg)
3419 sent.append(msg)
3420
3421 while True:
3422 try:
3423 wf.flush()
3424 break
3425 except self.BlockingIOError as e:
3426 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003427 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003428 self.assertEqual(e.characters_written, 0)
3429 received.append(rf.read())
3430
3431 received += iter(rf.read, None)
3432
3433 sent, received = b''.join(sent), b''.join(received)
3434 self.assertTrue(sent == received)
3435 self.assertTrue(wf.closed)
3436 self.assertTrue(rf.closed)
3437
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003438 def test_create_fail(self):
3439 # 'x' mode fails if file is existing
3440 with self.open(support.TESTFN, 'w'):
3441 pass
3442 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3443
3444 def test_create_writes(self):
3445 # 'x' mode opens for writing
3446 with self.open(support.TESTFN, 'xb') as f:
3447 f.write(b"spam")
3448 with self.open(support.TESTFN, 'rb') as f:
3449 self.assertEqual(b"spam", f.read())
3450
Christian Heimes7b648752012-09-10 14:48:43 +02003451 def test_open_allargs(self):
3452 # there used to be a buffer overflow in the parser for rawmode
3453 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3454
3455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003456class CMiscIOTest(MiscIOTest):
3457 io = io
3458
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003459 def test_readinto_buffer_overflow(self):
3460 # Issue #18025
3461 class BadReader(self.io.BufferedIOBase):
3462 def read(self, n=-1):
3463 return b'x' * 10**6
3464 bufio = BadReader()
3465 b = bytearray(2)
3466 self.assertRaises(ValueError, bufio.readinto, b)
3467
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003468 @unittest.skipUnless(threading, 'Threading required for this test.')
3469 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3470 # Issue #23309: deadlocks at shutdown should be avoided when a
3471 # daemon thread and the main thread both write to a file.
3472 code = """if 1:
3473 import sys
3474 import time
3475 import threading
3476
3477 file = sys.{stream_name}
3478
3479 def run():
3480 while True:
3481 file.write('.')
3482 file.flush()
3483
3484 thread = threading.Thread(target=run)
3485 thread.daemon = True
3486 thread.start()
3487
3488 time.sleep(0.5)
3489 file.write('!')
3490 file.flush()
3491 """.format_map(locals())
3492 res, _ = run_python_until_end("-c", code)
3493 err = res.err.decode()
3494 if res.rc != 0:
3495 # Failure: should be a fatal error
3496 self.assertIn("Fatal Python error: could not acquire lock "
3497 "for <_io.BufferedWriter name='<{stream_name}>'> "
3498 "at interpreter shutdown, possibly due to "
3499 "daemon threads".format_map(locals()),
3500 err)
3501 else:
3502 self.assertFalse(err.strip('.!'))
3503
3504 def test_daemon_threads_shutdown_stdout_deadlock(self):
3505 self.check_daemon_threads_shutdown_deadlock('stdout')
3506
3507 def test_daemon_threads_shutdown_stderr_deadlock(self):
3508 self.check_daemon_threads_shutdown_deadlock('stderr')
3509
3510
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003511class PyMiscIOTest(MiscIOTest):
3512 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003513
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003514
3515@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3516class SignalsTest(unittest.TestCase):
3517
3518 def setUp(self):
3519 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3520
3521 def tearDown(self):
3522 signal.signal(signal.SIGALRM, self.oldalrm)
3523
3524 def alarm_interrupt(self, sig, frame):
3525 1/0
3526
3527 @unittest.skipUnless(threading, 'Threading required for this test.')
3528 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3529 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003530 invokes the signal handler, and bubbles up the exception raised
3531 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003532 read_results = []
3533 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003534 if hasattr(signal, 'pthread_sigmask'):
3535 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003536 s = os.read(r, 1)
3537 read_results.append(s)
3538 t = threading.Thread(target=_read)
3539 t.daemon = True
3540 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003541 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003542 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003543 try:
3544 wio = self.io.open(w, **fdopen_kwargs)
3545 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003546 # Fill the pipe enough that the write will be blocking.
3547 # It will be interrupted by the timer armed above. Since the
3548 # other thread has read one byte, the low-level write will
3549 # return with a successful (partial) result rather than an EINTR.
3550 # The buffered IO layer must check for pending signal
3551 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003552 signal.alarm(1)
3553 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003554 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003555 finally:
3556 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003557 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003558 # We got one byte, get another one and check that it isn't a
3559 # repeat of the first one.
3560 read_results.append(os.read(r, 1))
3561 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3562 finally:
3563 os.close(w)
3564 os.close(r)
3565 # This is deliberate. If we didn't close the file descriptor
3566 # before closing wio, wio would try to flush its internal
3567 # buffer, and block again.
3568 try:
3569 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003570 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003571 if e.errno != errno.EBADF:
3572 raise
3573
3574 def test_interrupted_write_unbuffered(self):
3575 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3576
3577 def test_interrupted_write_buffered(self):
3578 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3579
Victor Stinner6ab72862014-09-03 23:32:28 +02003580 # Issue #22331: The test hangs on FreeBSD 7.2
3581 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003582 def test_interrupted_write_text(self):
3583 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3584
Brett Cannon31f59292011-02-21 19:29:56 +00003585 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003586 def check_reentrant_write(self, data, **fdopen_kwargs):
3587 def on_alarm(*args):
3588 # Will be called reentrantly from the same thread
3589 wio.write(data)
3590 1/0
3591 signal.signal(signal.SIGALRM, on_alarm)
3592 r, w = os.pipe()
3593 wio = self.io.open(w, **fdopen_kwargs)
3594 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003595 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003596 # Either the reentrant call to wio.write() fails with RuntimeError,
3597 # or the signal handler raises ZeroDivisionError.
3598 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3599 while 1:
3600 for i in range(100):
3601 wio.write(data)
3602 wio.flush()
3603 # Make sure the buffer doesn't fill up and block further writes
3604 os.read(r, len(data) * 100)
3605 exc = cm.exception
3606 if isinstance(exc, RuntimeError):
3607 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3608 finally:
3609 wio.close()
3610 os.close(r)
3611
3612 def test_reentrant_write_buffered(self):
3613 self.check_reentrant_write(b"xy", mode="wb")
3614
3615 def test_reentrant_write_text(self):
3616 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3617
Antoine Pitrou707ce822011-02-25 21:24:11 +00003618 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3619 """Check that a buffered read, when it gets interrupted (either
3620 returning a partial result or EINTR), properly invokes the signal
3621 handler and retries if the latter returned successfully."""
3622 r, w = os.pipe()
3623 fdopen_kwargs["closefd"] = False
3624 def alarm_handler(sig, frame):
3625 os.write(w, b"bar")
3626 signal.signal(signal.SIGALRM, alarm_handler)
3627 try:
3628 rio = self.io.open(r, **fdopen_kwargs)
3629 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003630 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003631 # Expected behaviour:
3632 # - first raw read() returns partial b"foo"
3633 # - second raw read() returns EINTR
3634 # - third raw read() returns b"bar"
3635 self.assertEqual(decode(rio.read(6)), "foobar")
3636 finally:
3637 rio.close()
3638 os.close(w)
3639 os.close(r)
3640
Antoine Pitrou20db5112011-08-19 20:32:34 +02003641 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003642 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3643 mode="rb")
3644
Antoine Pitrou20db5112011-08-19 20:32:34 +02003645 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003646 self.check_interrupted_read_retry(lambda x: x,
3647 mode="r")
3648
3649 @unittest.skipUnless(threading, 'Threading required for this test.')
3650 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3651 """Check that a buffered write, when it gets interrupted (either
3652 returning a partial result or EINTR), properly invokes the signal
3653 handler and retries if the latter returned successfully."""
3654 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003655
Antoine Pitrou707ce822011-02-25 21:24:11 +00003656 # A quantity that exceeds the buffer size of an anonymous pipe's
3657 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003658 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003659 r, w = os.pipe()
3660 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003661
Antoine Pitrou707ce822011-02-25 21:24:11 +00003662 # We need a separate thread to read from the pipe and allow the
3663 # write() to finish. This thread is started after the SIGALRM is
3664 # received (forcing a first EINTR in write()).
3665 read_results = []
3666 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003667 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003668 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003669 try:
3670 while not write_finished:
3671 while r in select.select([r], [], [], 1.0)[0]:
3672 s = os.read(r, 1024)
3673 read_results.append(s)
3674 except BaseException as exc:
3675 nonlocal error
3676 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003677 t = threading.Thread(target=_read)
3678 t.daemon = True
3679 def alarm1(sig, frame):
3680 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003681 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003682 def alarm2(sig, frame):
3683 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003684
3685 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003686 signal.signal(signal.SIGALRM, alarm1)
3687 try:
3688 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003689 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003690 # Expected behaviour:
3691 # - first raw write() is partial (because of the limited pipe buffer
3692 # and the first alarm)
3693 # - second raw write() returns EINTR (because of the second alarm)
3694 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003695 written = wio.write(large_data)
3696 self.assertEqual(N, written)
3697
Antoine Pitrou707ce822011-02-25 21:24:11 +00003698 wio.flush()
3699 write_finished = True
3700 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003701
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003702 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003703 self.assertEqual(N, sum(len(x) for x in read_results))
3704 finally:
3705 write_finished = True
3706 os.close(w)
3707 os.close(r)
3708 # This is deliberate. If we didn't close the file descriptor
3709 # before closing wio, wio would try to flush its internal
3710 # buffer, and could block (in case of failure).
3711 try:
3712 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003713 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003714 if e.errno != errno.EBADF:
3715 raise
3716
Antoine Pitrou20db5112011-08-19 20:32:34 +02003717 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003718 self.check_interrupted_write_retry(b"x", mode="wb")
3719
Antoine Pitrou20db5112011-08-19 20:32:34 +02003720 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003721 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3722
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003723
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003724class CSignalsTest(SignalsTest):
3725 io = io
3726
3727class PySignalsTest(SignalsTest):
3728 io = pyio
3729
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003730 # Handling reentrancy issues would slow down _pyio even more, so the
3731 # tests are disabled.
3732 test_reentrant_write_buffered = None
3733 test_reentrant_write_text = None
3734
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003735
Ezio Melottidaa42c72013-03-23 16:30:16 +02003736def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003737 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003738 CBufferedReaderTest, PyBufferedReaderTest,
3739 CBufferedWriterTest, PyBufferedWriterTest,
3740 CBufferedRWPairTest, PyBufferedRWPairTest,
3741 CBufferedRandomTest, PyBufferedRandomTest,
3742 StatefulIncrementalDecoderTest,
3743 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3744 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003745 CMiscIOTest, PyMiscIOTest,
3746 CSignalsTest, PySignalsTest,
3747 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003748
3749 # Put the namespaces of the IO module we are testing and some useful mock
3750 # classes in the __dict__ of each test.
3751 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003752 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003753 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3754 c_io_ns = {name : getattr(io, name) for name in all_members}
3755 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3756 globs = globals()
3757 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3758 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3759 # Avoid turning open into a bound method.
3760 py_io_ns["open"] = pyio.OpenWrapper
3761 for test in tests:
3762 if test.__name__.startswith("C"):
3763 for name, obj in c_io_ns.items():
3764 setattr(test, name, obj)
3765 elif test.__name__.startswith("Py"):
3766 for name, obj in py_io_ns.items():
3767 setattr(test, name, obj)
3768
Ezio Melottidaa42c72013-03-23 16:30:16 +02003769 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3770 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003771
3772if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003773 unittest.main()