blob: 28f440d91ad29f569c11fb92011c65a2a38e4c13 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300452 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300572 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200592 def check_flush_error_on_close(self, *args, **kwargs):
593 # Test that the file is closed despite failed flush
594 # and that flush() is called before file closed.
595 f = self.open(*args, **kwargs)
596 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200598 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200603 self.assertTrue(closed) # flush() called
604 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200605 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200606
607 def test_flush_error_on_close(self):
608 # raw file
609 # Issue #5700: io.FileIO calls flush() after file closed
610 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
611 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
612 self.check_flush_error_on_close(fd, 'wb', buffering=0)
613 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
614 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
615 os.close(fd)
616 # buffered io
617 self.check_flush_error_on_close(support.TESTFN, 'wb')
618 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
619 self.check_flush_error_on_close(fd, 'wb')
620 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
621 self.check_flush_error_on_close(fd, 'wb', closefd=False)
622 os.close(fd)
623 # text io
624 self.check_flush_error_on_close(support.TESTFN, 'w')
625 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
626 self.check_flush_error_on_close(fd, 'w')
627 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
628 self.check_flush_error_on_close(fd, 'w', closefd=False)
629 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000630
631 def test_multi_close(self):
632 f = self.open(support.TESTFN, "wb", buffering=0)
633 f.close()
634 f.close()
635 f.close()
636 self.assertRaises(ValueError, f.flush)
637
Antoine Pitrou328ec742010-09-14 18:37:24 +0000638 def test_RawIOBase_read(self):
639 # Exercise the default RawIOBase.read() implementation (which calls
640 # readinto() internally).
641 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
642 self.assertEqual(rawio.read(2), b"ab")
643 self.assertEqual(rawio.read(2), b"c")
644 self.assertEqual(rawio.read(2), b"d")
645 self.assertEqual(rawio.read(2), None)
646 self.assertEqual(rawio.read(2), b"ef")
647 self.assertEqual(rawio.read(2), b"g")
648 self.assertEqual(rawio.read(2), None)
649 self.assertEqual(rawio.read(2), b"")
650
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400651 def test_types_have_dict(self):
652 test = (
653 self.IOBase(),
654 self.RawIOBase(),
655 self.TextIOBase(),
656 self.StringIO(),
657 self.BytesIO()
658 )
659 for obj in test:
660 self.assertTrue(hasattr(obj, "__dict__"))
661
Ross Lagerwall59142db2011-10-31 20:34:46 +0200662 def test_opener(self):
663 with self.open(support.TESTFN, "w") as f:
664 f.write("egg\n")
665 fd = os.open(support.TESTFN, os.O_RDONLY)
666 def opener(path, flags):
667 return fd
668 with self.open("non-existent", "r", opener=opener) as f:
669 self.assertEqual(f.read(), "egg\n")
670
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200671 def test_fileio_closefd(self):
672 # Issue #4841
673 with self.open(__file__, 'rb') as f1, \
674 self.open(__file__, 'rb') as f2:
675 fileio = self.FileIO(f1.fileno(), closefd=False)
676 # .__init__() must not close f1
677 fileio.__init__(f2.fileno(), closefd=False)
678 f1.readline()
679 # .close() must not close f2
680 fileio.close()
681 f2.readline()
682
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300683 def test_nonbuffered_textio(self):
684 with warnings.catch_warnings(record=True) as recorded:
685 with self.assertRaises(ValueError):
686 self.open(support.TESTFN, 'w', buffering=0)
687 support.gc_collect()
688 self.assertEqual(recorded, [])
689
690 def test_invalid_newline(self):
691 with warnings.catch_warnings(record=True) as recorded:
692 with self.assertRaises(ValueError):
693 self.open(support.TESTFN, 'w', newline='invalid')
694 support.gc_collect()
695 self.assertEqual(recorded, [])
696
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200699
700 def test_IOBase_finalize(self):
701 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
702 # class which inherits IOBase and an object of this class are caught
703 # in a reference cycle and close() is already in the method cache.
704 class MyIO(self.IOBase):
705 def close(self):
706 pass
707
708 # create an instance to populate the method cache
709 MyIO()
710 obj = MyIO()
711 obj.obj = obj
712 wr = weakref.ref(obj)
713 del MyIO
714 del obj
715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300716 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718class PyIOTest(IOTest):
719 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000720
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700722@support.cpython_only
723class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700724
Gregory P. Smith054b0652015-04-14 12:58:05 -0700725 def test_RawIOBase_io_in_pyio_match(self):
726 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200727 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
728 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700729 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
730
731 def test_RawIOBase_pyio_in_io_match(self):
732 """Test that c RawIOBase class has all pyio RawIOBase methods"""
733 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
734 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
735
736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737class CommonBufferedTests:
738 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
739
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000740 def test_detach(self):
741 raw = self.MockRawIO()
742 buf = self.tp(raw)
743 self.assertIs(buf.detach(), raw)
744 self.assertRaises(ValueError, buf.detach)
745
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600746 repr(buf) # Should still work
747
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 def test_fileno(self):
749 rawio = self.MockRawIO()
750 bufio = self.tp(rawio)
751
Ezio Melottib3aedd42010-11-20 19:04:17 +0000752 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753
Zachary Ware9fe6d862013-12-08 00:20:35 -0600754 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 def test_no_fileno(self):
756 # XXX will we always have fileno() function? If so, kill
757 # this test. Else, write it.
758 pass
759
760 def test_invalid_args(self):
761 rawio = self.MockRawIO()
762 bufio = self.tp(rawio)
763 # Invalid whence
764 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200765 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766
767 def test_override_destructor(self):
768 tp = self.tp
769 record = []
770 class MyBufferedIO(tp):
771 def __del__(self):
772 record.append(1)
773 try:
774 f = super().__del__
775 except AttributeError:
776 pass
777 else:
778 f()
779 def close(self):
780 record.append(2)
781 super().close()
782 def flush(self):
783 record.append(3)
784 super().flush()
785 rawio = self.MockRawIO()
786 bufio = MyBufferedIO(rawio)
787 writable = bufio.writable()
788 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000789 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000790 if writable:
791 self.assertEqual(record, [1, 2, 3])
792 else:
793 self.assertEqual(record, [1, 2])
794
795 def test_context_manager(self):
796 # Test usability as a context manager
797 rawio = self.MockRawIO()
798 bufio = self.tp(rawio)
799 def _with():
800 with bufio:
801 pass
802 _with()
803 # bufio should now be closed, and using it a second time should raise
804 # a ValueError.
805 self.assertRaises(ValueError, _with)
806
807 def test_error_through_destructor(self):
808 # Test that the exception state is not modified by a destructor,
809 # even if close() fails.
810 rawio = self.CloseFailureIO()
811 def f():
812 self.tp(rawio).xyzzy
813 with support.captured_output("stderr") as s:
814 self.assertRaises(AttributeError, f)
815 s = s.getvalue().strip()
816 if s:
817 # The destructor *may* have printed an unraisable error, check it
818 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200819 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000820 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000821
Antoine Pitrou716c4442009-05-23 19:04:03 +0000822 def test_repr(self):
823 raw = self.MockRawIO()
824 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300825 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000826 self.assertEqual(repr(b), "<%s>" % clsname)
827 raw.name = "dummy"
828 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
829 raw.name = b"dummy"
830 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
831
Antoine Pitrou6be88762010-05-03 16:48:20 +0000832 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200833 # Test that buffered file is closed despite failed flush
834 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000835 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200836 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000837 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200838 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200839 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000840 raw.flush = bad_flush
841 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200842 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600843 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200844 self.assertTrue(raw.closed)
845 self.assertTrue(closed) # flush() called
846 self.assertFalse(closed[0]) # flush() called before file closed
847 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200848 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600849
850 def test_close_error_on_close(self):
851 raw = self.MockRawIO()
852 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200853 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600854 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200855 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600856 raw.close = bad_close
857 b = self.tp(raw)
858 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200859 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600860 b.close()
861 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300862 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600863 self.assertEqual(err.exception.__context__.args, ('flush',))
864 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000865
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300866 def test_nonnormalized_close_error_on_close(self):
867 # Issue #21677
868 raw = self.MockRawIO()
869 def bad_flush():
870 raise non_existing_flush
871 def bad_close():
872 raise non_existing_close
873 raw.close = bad_close
874 b = self.tp(raw)
875 b.flush = bad_flush
876 with self.assertRaises(NameError) as err: # exception not swallowed
877 b.close()
878 self.assertIn('non_existing_close', str(err.exception))
879 self.assertIsInstance(err.exception.__context__, NameError)
880 self.assertIn('non_existing_flush', str(err.exception.__context__))
881 self.assertFalse(b.closed)
882
Antoine Pitrou6be88762010-05-03 16:48:20 +0000883 def test_multi_close(self):
884 raw = self.MockRawIO()
885 b = self.tp(raw)
886 b.close()
887 b.close()
888 b.close()
889 self.assertRaises(ValueError, b.flush)
890
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000891 def test_unseekable(self):
892 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
893 self.assertRaises(self.UnsupportedOperation, bufio.tell)
894 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
895
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000896 def test_readonly_attributes(self):
897 raw = self.MockRawIO()
898 buf = self.tp(raw)
899 x = self.MockRawIO()
900 with self.assertRaises(AttributeError):
901 buf.raw = x
902
Guido van Rossum78892e42007-04-06 17:31:18 +0000903
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200904class SizeofTest:
905
906 @support.cpython_only
907 def test_sizeof(self):
908 bufsize1 = 4096
909 bufsize2 = 8192
910 rawio = self.MockRawIO()
911 bufio = self.tp(rawio, buffer_size=bufsize1)
912 size = sys.getsizeof(bufio) - bufsize1
913 rawio = self.MockRawIO()
914 bufio = self.tp(rawio, buffer_size=bufsize2)
915 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
916
Jesus Ceadc469452012-10-04 12:37:56 +0200917 @support.cpython_only
918 def test_buffer_freeing(self) :
919 bufsize = 4096
920 rawio = self.MockRawIO()
921 bufio = self.tp(rawio, buffer_size=bufsize)
922 size = sys.getsizeof(bufio) - bufsize
923 bufio.close()
924 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
927 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 def test_constructor(self):
930 rawio = self.MockRawIO([b"abc"])
931 bufio = self.tp(rawio)
932 bufio.__init__(rawio)
933 bufio.__init__(rawio, buffer_size=1024)
934 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000935 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
938 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
939 rawio = self.MockRawIO([b"abc"])
940 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000941 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000942
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200943 def test_uninitialized(self):
944 bufio = self.tp.__new__(self.tp)
945 del bufio
946 bufio = self.tp.__new__(self.tp)
947 self.assertRaisesRegex((ValueError, AttributeError),
948 'uninitialized|has no attribute',
949 bufio.read, 0)
950 bufio.__init__(self.MockRawIO())
951 self.assertEqual(bufio.read(0), b'')
952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000953 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000954 for arg in (None, 7):
955 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
956 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000957 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000958 # Invalid args
959 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961 def test_read1(self):
962 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
963 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000964 self.assertEqual(b"a", bufio.read(1))
965 self.assertEqual(b"b", bufio.read1(1))
966 self.assertEqual(rawio._reads, 1)
967 self.assertEqual(b"c", bufio.read1(100))
968 self.assertEqual(rawio._reads, 1)
969 self.assertEqual(b"d", bufio.read1(100))
970 self.assertEqual(rawio._reads, 2)
971 self.assertEqual(b"efg", bufio.read1(100))
972 self.assertEqual(rawio._reads, 3)
973 self.assertEqual(b"", bufio.read1(100))
974 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 # Invalid args
976 self.assertRaises(ValueError, bufio.read1, -1)
977
978 def test_readinto(self):
979 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
980 bufio = self.tp(rawio)
981 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000982 self.assertEqual(bufio.readinto(b), 2)
983 self.assertEqual(b, b"ab")
984 self.assertEqual(bufio.readinto(b), 2)
985 self.assertEqual(b, b"cd")
986 self.assertEqual(bufio.readinto(b), 2)
987 self.assertEqual(b, b"ef")
988 self.assertEqual(bufio.readinto(b), 1)
989 self.assertEqual(b, b"gf")
990 self.assertEqual(bufio.readinto(b), 0)
991 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200992 rawio = self.MockRawIO((b"abc", None))
993 bufio = self.tp(rawio)
994 self.assertEqual(bufio.readinto(b), 2)
995 self.assertEqual(b, b"ab")
996 self.assertEqual(bufio.readinto(b), 1)
997 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998
Benjamin Petersona96fea02014-06-22 14:17:44 -0700999 def test_readinto1(self):
1000 buffer_size = 10
1001 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1002 bufio = self.tp(rawio, buffer_size=buffer_size)
1003 b = bytearray(2)
1004 self.assertEqual(bufio.peek(3), b'abc')
1005 self.assertEqual(rawio._reads, 1)
1006 self.assertEqual(bufio.readinto1(b), 2)
1007 self.assertEqual(b, b"ab")
1008 self.assertEqual(rawio._reads, 1)
1009 self.assertEqual(bufio.readinto1(b), 1)
1010 self.assertEqual(b[:1], b"c")
1011 self.assertEqual(rawio._reads, 1)
1012 self.assertEqual(bufio.readinto1(b), 2)
1013 self.assertEqual(b, b"de")
1014 self.assertEqual(rawio._reads, 2)
1015 b = bytearray(2*buffer_size)
1016 self.assertEqual(bufio.peek(3), b'fgh')
1017 self.assertEqual(rawio._reads, 3)
1018 self.assertEqual(bufio.readinto1(b), 6)
1019 self.assertEqual(b[:6], b"fghjkl")
1020 self.assertEqual(rawio._reads, 4)
1021
1022 def test_readinto_array(self):
1023 buffer_size = 60
1024 data = b"a" * 26
1025 rawio = self.MockRawIO((data,))
1026 bufio = self.tp(rawio, buffer_size=buffer_size)
1027
1028 # Create an array with element size > 1 byte
1029 b = array.array('i', b'x' * 32)
1030 assert len(b) != 16
1031
1032 # Read into it. We should get as many *bytes* as we can fit into b
1033 # (which is more than the number of elements)
1034 n = bufio.readinto(b)
1035 self.assertGreater(n, len(b))
1036
1037 # Check that old contents of b are preserved
1038 bm = memoryview(b).cast('B')
1039 self.assertLess(n, len(bm))
1040 self.assertEqual(bm[:n], data[:n])
1041 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1042
1043 def test_readinto1_array(self):
1044 buffer_size = 60
1045 data = b"a" * 26
1046 rawio = self.MockRawIO((data,))
1047 bufio = self.tp(rawio, buffer_size=buffer_size)
1048
1049 # Create an array with element size > 1 byte
1050 b = array.array('i', b'x' * 32)
1051 assert len(b) != 16
1052
1053 # Read into it. We should get as many *bytes* as we can fit into b
1054 # (which is more than the number of elements)
1055 n = bufio.readinto1(b)
1056 self.assertGreater(n, len(b))
1057
1058 # Check that old contents of b are preserved
1059 bm = memoryview(b).cast('B')
1060 self.assertLess(n, len(bm))
1061 self.assertEqual(bm[:n], data[:n])
1062 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1063
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001064 def test_readlines(self):
1065 def bufio():
1066 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1067 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001068 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1069 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1070 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001073 data = b"abcdefghi"
1074 dlen = len(data)
1075
1076 tests = [
1077 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1078 [ 100, [ 3, 3, 3], [ dlen ] ],
1079 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1080 ]
1081
1082 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001083 rawio = self.MockFileIO(data)
1084 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001085 pos = 0
1086 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001088 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001090 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001093 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1095 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(b"abcd", bufio.read(6))
1097 self.assertEqual(b"e", bufio.read(1))
1098 self.assertEqual(b"fg", bufio.read())
1099 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001100 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001101 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001102
Victor Stinnera80987f2011-05-25 22:47:16 +02001103 rawio = self.MockRawIO((b"a", None, None))
1104 self.assertEqual(b"a", rawio.readall())
1105 self.assertIsNone(rawio.readall())
1106
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 def test_read_past_eof(self):
1108 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1109 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001110
Ezio Melottib3aedd42010-11-20 19:04:17 +00001111 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_read_all(self):
1114 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1115 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001116
Ezio Melottib3aedd42010-11-20 19:04:17 +00001117 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001118
Victor Stinner45df8202010-04-28 22:31:17 +00001119 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001120 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001122 try:
1123 # Write out many bytes with exactly the same number of 0's,
1124 # 1's... 255's. This will help us check that concurrent reading
1125 # doesn't duplicate or forget contents.
1126 N = 1000
1127 l = list(range(256)) * N
1128 random.shuffle(l)
1129 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001130 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001131 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001132 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001134 errors = []
1135 results = []
1136 def f():
1137 try:
1138 # Intra-buffer read then buffer-flushing read
1139 for n in cycle([1, 19]):
1140 s = bufio.read(n)
1141 if not s:
1142 break
1143 # list.append() is atomic
1144 results.append(s)
1145 except Exception as e:
1146 errors.append(e)
1147 raise
1148 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001149 with support.start_threads(threads):
1150 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001151 self.assertFalse(errors,
1152 "the following exceptions were caught: %r" % errors)
1153 s = b''.join(results)
1154 for i in range(256):
1155 c = bytes(bytearray([i]))
1156 self.assertEqual(s.count(c), N)
1157 finally:
1158 support.unlink(support.TESTFN)
1159
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001160 def test_unseekable(self):
1161 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1162 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1163 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1164 bufio.read(1)
1165 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1166 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 def test_misbehaved_io(self):
1169 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1170 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001171 self.assertRaises(OSError, bufio.seek, 0)
1172 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001174 def test_no_extraneous_read(self):
1175 # Issue #9550; when the raw IO object has satisfied the read request,
1176 # we should not issue any additional reads, otherwise it may block
1177 # (e.g. socket).
1178 bufsize = 16
1179 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1180 rawio = self.MockRawIO([b"x" * n])
1181 bufio = self.tp(rawio, bufsize)
1182 self.assertEqual(bufio.read(n), b"x" * n)
1183 # Simple case: one raw read is enough to satisfy the request.
1184 self.assertEqual(rawio._extraneous_reads, 0,
1185 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1186 # A more complex case where two raw reads are needed to satisfy
1187 # the request.
1188 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1189 bufio = self.tp(rawio, bufsize)
1190 self.assertEqual(bufio.read(n), b"x" * n)
1191 self.assertEqual(rawio._extraneous_reads, 0,
1192 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1193
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001194 def test_read_on_closed(self):
1195 # Issue #23796
1196 b = io.BufferedReader(io.BytesIO(b"12"))
1197 b.read(1)
1198 b.close()
1199 self.assertRaises(ValueError, b.peek)
1200 self.assertRaises(ValueError, b.read1, 1)
1201
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001202
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001203class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 tp = io.BufferedReader
1205
1206 def test_constructor(self):
1207 BufferedReaderTest.test_constructor(self)
1208 # The allocation can succeed on 32-bit builds, e.g. with more
1209 # than 2GB RAM and a 64-bit kernel.
1210 if sys.maxsize > 0x7FFFFFFF:
1211 rawio = self.MockRawIO()
1212 bufio = self.tp(rawio)
1213 self.assertRaises((OverflowError, MemoryError, ValueError),
1214 bufio.__init__, rawio, sys.maxsize)
1215
1216 def test_initialization(self):
1217 rawio = self.MockRawIO([b"abc"])
1218 bufio = self.tp(rawio)
1219 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1220 self.assertRaises(ValueError, bufio.read)
1221 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1222 self.assertRaises(ValueError, bufio.read)
1223 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1224 self.assertRaises(ValueError, bufio.read)
1225
1226 def test_misbehaved_io_read(self):
1227 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1228 bufio = self.tp(rawio)
1229 # _pyio.BufferedReader seems to implement reading different, so that
1230 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001231 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232
1233 def test_garbage_collection(self):
1234 # C BufferedReader objects are collected.
1235 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001236 with support.check_warnings(('', ResourceWarning)):
1237 rawio = self.FileIO(support.TESTFN, "w+b")
1238 f = self.tp(rawio)
1239 f.f = f
1240 wr = weakref.ref(f)
1241 del f
1242 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001243 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244
R David Murray67bfe802013-02-23 21:51:05 -05001245 def test_args_error(self):
1246 # Issue #17275
1247 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1248 self.tp(io.BytesIO(), 1024, 1024, 1024)
1249
1250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251class PyBufferedReaderTest(BufferedReaderTest):
1252 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001253
Guido van Rossuma9e20242007-03-08 00:43:48 +00001254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001255class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1256 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001257
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258 def test_constructor(self):
1259 rawio = self.MockRawIO()
1260 bufio = self.tp(rawio)
1261 bufio.__init__(rawio)
1262 bufio.__init__(rawio, buffer_size=1024)
1263 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001264 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 bufio.flush()
1266 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1267 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1268 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1269 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001270 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001272 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001274 def test_uninitialized(self):
1275 bufio = self.tp.__new__(self.tp)
1276 del bufio
1277 bufio = self.tp.__new__(self.tp)
1278 self.assertRaisesRegex((ValueError, AttributeError),
1279 'uninitialized|has no attribute',
1280 bufio.write, b'')
1281 bufio.__init__(self.MockRawIO())
1282 self.assertEqual(bufio.write(b''), 0)
1283
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001284 def test_detach_flush(self):
1285 raw = self.MockRawIO()
1286 buf = self.tp(raw)
1287 buf.write(b"howdy!")
1288 self.assertFalse(raw._write_stack)
1289 buf.detach()
1290 self.assertEqual(raw._write_stack, [b"howdy!"])
1291
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001293 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001294 writer = self.MockRawIO()
1295 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001296 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001297 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001298
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 def test_write_overflow(self):
1300 writer = self.MockRawIO()
1301 bufio = self.tp(writer, 8)
1302 contents = b"abcdefghijklmnop"
1303 for n in range(0, len(contents), 3):
1304 bufio.write(contents[n:n+3])
1305 flushed = b"".join(writer._write_stack)
1306 # At least (total - 8) bytes were implicitly flushed, perhaps more
1307 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001308 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001309
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 def check_writes(self, intermediate_func):
1311 # Lots of writes, test the flushed output is as expected.
1312 contents = bytes(range(256)) * 1000
1313 n = 0
1314 writer = self.MockRawIO()
1315 bufio = self.tp(writer, 13)
1316 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1317 def gen_sizes():
1318 for size in count(1):
1319 for i in range(15):
1320 yield size
1321 sizes = gen_sizes()
1322 while n < len(contents):
1323 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001324 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001325 intermediate_func(bufio)
1326 n += size
1327 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001328 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001330 def test_writes(self):
1331 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001333 def test_writes_and_flushes(self):
1334 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001335
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336 def test_writes_and_seeks(self):
1337 def _seekabs(bufio):
1338 pos = bufio.tell()
1339 bufio.seek(pos + 1, 0)
1340 bufio.seek(pos - 1, 0)
1341 bufio.seek(pos, 0)
1342 self.check_writes(_seekabs)
1343 def _seekrel(bufio):
1344 pos = bufio.seek(0, 1)
1345 bufio.seek(+1, 1)
1346 bufio.seek(-1, 1)
1347 bufio.seek(pos, 0)
1348 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001349
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350 def test_writes_and_truncates(self):
1351 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 def test_write_non_blocking(self):
1354 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001355 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001356
Ezio Melottib3aedd42010-11-20 19:04:17 +00001357 self.assertEqual(bufio.write(b"abcd"), 4)
1358 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001359 # 1 byte will be written, the rest will be buffered
1360 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001361 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1364 raw.block_on(b"0")
1365 try:
1366 bufio.write(b"opqrwxyz0123456789")
1367 except self.BlockingIOError as e:
1368 written = e.characters_written
1369 else:
1370 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(written, 16)
1372 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001374
Ezio Melottib3aedd42010-11-20 19:04:17 +00001375 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 s = raw.pop_written()
1377 # Previously buffered bytes were flushed
1378 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001379
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380 def test_write_and_rewind(self):
1381 raw = io.BytesIO()
1382 bufio = self.tp(raw, 4)
1383 self.assertEqual(bufio.write(b"abcdef"), 6)
1384 self.assertEqual(bufio.tell(), 6)
1385 bufio.seek(0, 0)
1386 self.assertEqual(bufio.write(b"XY"), 2)
1387 bufio.seek(6, 0)
1388 self.assertEqual(raw.getvalue(), b"XYcdef")
1389 self.assertEqual(bufio.write(b"123456"), 6)
1390 bufio.flush()
1391 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001392
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001393 def test_flush(self):
1394 writer = self.MockRawIO()
1395 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001396 bufio.write(b"abc")
1397 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001399
Antoine Pitrou131a4892012-10-16 22:57:11 +02001400 def test_writelines(self):
1401 l = [b'ab', b'cd', b'ef']
1402 writer = self.MockRawIO()
1403 bufio = self.tp(writer, 8)
1404 bufio.writelines(l)
1405 bufio.flush()
1406 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1407
1408 def test_writelines_userlist(self):
1409 l = UserList([b'ab', b'cd', b'ef'])
1410 writer = self.MockRawIO()
1411 bufio = self.tp(writer, 8)
1412 bufio.writelines(l)
1413 bufio.flush()
1414 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1415
1416 def test_writelines_error(self):
1417 writer = self.MockRawIO()
1418 bufio = self.tp(writer, 8)
1419 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1420 self.assertRaises(TypeError, bufio.writelines, None)
1421 self.assertRaises(TypeError, bufio.writelines, 'abc')
1422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_destructor(self):
1424 writer = self.MockRawIO()
1425 bufio = self.tp(writer, 8)
1426 bufio.write(b"abc")
1427 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001428 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001429 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430
1431 def test_truncate(self):
1432 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001433 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 bufio = self.tp(raw, 8)
1435 bufio.write(b"abcdef")
1436 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001437 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001438 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439 self.assertEqual(f.read(), b"abc")
1440
Victor Stinner45df8202010-04-28 22:31:17 +00001441 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001442 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001443 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001444 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 # Write out many bytes from many threads and test they were
1446 # all flushed.
1447 N = 1000
1448 contents = bytes(range(256)) * N
1449 sizes = cycle([1, 19])
1450 n = 0
1451 queue = deque()
1452 while n < len(contents):
1453 size = next(sizes)
1454 queue.append(contents[n:n+size])
1455 n += size
1456 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001457 # We use a real file object because it allows us to
1458 # exercise situations where the GIL is released before
1459 # writing the buffer to the raw streams. This is in addition
1460 # to concurrency issues due to switching threads in the middle
1461 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001462 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001464 errors = []
1465 def f():
1466 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001467 while True:
1468 try:
1469 s = queue.popleft()
1470 except IndexError:
1471 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001472 bufio.write(s)
1473 except Exception as e:
1474 errors.append(e)
1475 raise
1476 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001477 with support.start_threads(threads):
1478 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001479 self.assertFalse(errors,
1480 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001481 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001482 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001483 s = f.read()
1484 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001486 finally:
1487 support.unlink(support.TESTFN)
1488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 def test_misbehaved_io(self):
1490 rawio = self.MisbehavedRawIO()
1491 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001492 self.assertRaises(OSError, bufio.seek, 0)
1493 self.assertRaises(OSError, bufio.tell)
1494 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495
Florent Xicluna109d5732012-07-07 17:03:22 +02001496 def test_max_buffer_size_removal(self):
1497 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001498 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001499
Benjamin Peterson68623612012-12-20 11:53:11 -06001500 def test_write_error_on_close(self):
1501 raw = self.MockRawIO()
1502 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001503 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001504 raw.write = bad_write
1505 b = self.tp(raw)
1506 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001507 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001508 self.assertTrue(b.closed)
1509
Benjamin Peterson59406a92009-03-26 17:10:29 +00001510
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001511class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512 tp = io.BufferedWriter
1513
1514 def test_constructor(self):
1515 BufferedWriterTest.test_constructor(self)
1516 # The allocation can succeed on 32-bit builds, e.g. with more
1517 # than 2GB RAM and a 64-bit kernel.
1518 if sys.maxsize > 0x7FFFFFFF:
1519 rawio = self.MockRawIO()
1520 bufio = self.tp(rawio)
1521 self.assertRaises((OverflowError, MemoryError, ValueError),
1522 bufio.__init__, rawio, sys.maxsize)
1523
1524 def test_initialization(self):
1525 rawio = self.MockRawIO()
1526 bufio = self.tp(rawio)
1527 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1528 self.assertRaises(ValueError, bufio.write, b"def")
1529 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1530 self.assertRaises(ValueError, bufio.write, b"def")
1531 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1532 self.assertRaises(ValueError, bufio.write, b"def")
1533
1534 def test_garbage_collection(self):
1535 # C BufferedWriter objects are collected, and collecting them flushes
1536 # all data to disk.
1537 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001538 with support.check_warnings(('', ResourceWarning)):
1539 rawio = self.FileIO(support.TESTFN, "w+b")
1540 f = self.tp(rawio)
1541 f.write(b"123xxx")
1542 f.x = f
1543 wr = weakref.ref(f)
1544 del f
1545 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001546 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001547 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 self.assertEqual(f.read(), b"123xxx")
1549
R David Murray67bfe802013-02-23 21:51:05 -05001550 def test_args_error(self):
1551 # Issue #17275
1552 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1553 self.tp(io.BytesIO(), 1024, 1024, 1024)
1554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555
1556class PyBufferedWriterTest(BufferedWriterTest):
1557 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001558
Guido van Rossum01a27522007-03-07 01:00:12 +00001559class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001560
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001561 def test_constructor(self):
1562 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001563 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001564
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001565 def test_uninitialized(self):
1566 pair = self.tp.__new__(self.tp)
1567 del pair
1568 pair = self.tp.__new__(self.tp)
1569 self.assertRaisesRegex((ValueError, AttributeError),
1570 'uninitialized|has no attribute',
1571 pair.read, 0)
1572 self.assertRaisesRegex((ValueError, AttributeError),
1573 'uninitialized|has no attribute',
1574 pair.write, b'')
1575 pair.__init__(self.MockRawIO(), self.MockRawIO())
1576 self.assertEqual(pair.read(0), b'')
1577 self.assertEqual(pair.write(b''), 0)
1578
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001579 def test_detach(self):
1580 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1581 self.assertRaises(self.UnsupportedOperation, pair.detach)
1582
Florent Xicluna109d5732012-07-07 17:03:22 +02001583 def test_constructor_max_buffer_size_removal(self):
1584 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001585 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001586
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001587 def test_constructor_with_not_readable(self):
1588 class NotReadable(MockRawIO):
1589 def readable(self):
1590 return False
1591
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001592 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001593
1594 def test_constructor_with_not_writeable(self):
1595 class NotWriteable(MockRawIO):
1596 def writable(self):
1597 return False
1598
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001599 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001600
1601 def test_read(self):
1602 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1603
1604 self.assertEqual(pair.read(3), b"abc")
1605 self.assertEqual(pair.read(1), b"d")
1606 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001607 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1608 self.assertEqual(pair.read(None), b"abc")
1609
1610 def test_readlines(self):
1611 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1612 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1613 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1614 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001615
1616 def test_read1(self):
1617 # .read1() is delegated to the underlying reader object, so this test
1618 # can be shallow.
1619 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1620
1621 self.assertEqual(pair.read1(3), b"abc")
1622
1623 def test_readinto(self):
1624 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1625
1626 data = bytearray(5)
1627 self.assertEqual(pair.readinto(data), 5)
1628 self.assertEqual(data, b"abcde")
1629
1630 def test_write(self):
1631 w = self.MockRawIO()
1632 pair = self.tp(self.MockRawIO(), w)
1633
1634 pair.write(b"abc")
1635 pair.flush()
1636 pair.write(b"def")
1637 pair.flush()
1638 self.assertEqual(w._write_stack, [b"abc", b"def"])
1639
1640 def test_peek(self):
1641 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1642
1643 self.assertTrue(pair.peek(3).startswith(b"abc"))
1644 self.assertEqual(pair.read(3), b"abc")
1645
1646 def test_readable(self):
1647 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1648 self.assertTrue(pair.readable())
1649
1650 def test_writeable(self):
1651 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1652 self.assertTrue(pair.writable())
1653
1654 def test_seekable(self):
1655 # BufferedRWPairs are never seekable, even if their readers and writers
1656 # are.
1657 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1658 self.assertFalse(pair.seekable())
1659
1660 # .flush() is delegated to the underlying writer object and has been
1661 # tested in the test_write method.
1662
1663 def test_close_and_closed(self):
1664 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1665 self.assertFalse(pair.closed)
1666 pair.close()
1667 self.assertTrue(pair.closed)
1668
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001669 def test_reader_close_error_on_close(self):
1670 def reader_close():
1671 reader_non_existing
1672 reader = self.MockRawIO()
1673 reader.close = reader_close
1674 writer = self.MockRawIO()
1675 pair = self.tp(reader, writer)
1676 with self.assertRaises(NameError) as err:
1677 pair.close()
1678 self.assertIn('reader_non_existing', str(err.exception))
1679 self.assertTrue(pair.closed)
1680 self.assertFalse(reader.closed)
1681 self.assertTrue(writer.closed)
1682
1683 def test_writer_close_error_on_close(self):
1684 def writer_close():
1685 writer_non_existing
1686 reader = self.MockRawIO()
1687 writer = self.MockRawIO()
1688 writer.close = writer_close
1689 pair = self.tp(reader, writer)
1690 with self.assertRaises(NameError) as err:
1691 pair.close()
1692 self.assertIn('writer_non_existing', str(err.exception))
1693 self.assertFalse(pair.closed)
1694 self.assertTrue(reader.closed)
1695 self.assertFalse(writer.closed)
1696
1697 def test_reader_writer_close_error_on_close(self):
1698 def reader_close():
1699 reader_non_existing
1700 def writer_close():
1701 writer_non_existing
1702 reader = self.MockRawIO()
1703 reader.close = reader_close
1704 writer = self.MockRawIO()
1705 writer.close = writer_close
1706 pair = self.tp(reader, writer)
1707 with self.assertRaises(NameError) as err:
1708 pair.close()
1709 self.assertIn('reader_non_existing', str(err.exception))
1710 self.assertIsInstance(err.exception.__context__, NameError)
1711 self.assertIn('writer_non_existing', str(err.exception.__context__))
1712 self.assertFalse(pair.closed)
1713 self.assertFalse(reader.closed)
1714 self.assertFalse(writer.closed)
1715
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001716 def test_isatty(self):
1717 class SelectableIsAtty(MockRawIO):
1718 def __init__(self, isatty):
1719 MockRawIO.__init__(self)
1720 self._isatty = isatty
1721
1722 def isatty(self):
1723 return self._isatty
1724
1725 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1726 self.assertFalse(pair.isatty())
1727
1728 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1729 self.assertTrue(pair.isatty())
1730
1731 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1732 self.assertTrue(pair.isatty())
1733
1734 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1735 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001736
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001737 def test_weakref_clearing(self):
1738 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1739 ref = weakref.ref(brw)
1740 brw = None
1741 ref = None # Shouldn't segfault.
1742
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743class CBufferedRWPairTest(BufferedRWPairTest):
1744 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746class PyBufferedRWPairTest(BufferedRWPairTest):
1747 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749
1750class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1751 read_mode = "rb+"
1752 write_mode = "wb+"
1753
1754 def test_constructor(self):
1755 BufferedReaderTest.test_constructor(self)
1756 BufferedWriterTest.test_constructor(self)
1757
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001758 def test_uninitialized(self):
1759 BufferedReaderTest.test_uninitialized(self)
1760 BufferedWriterTest.test_uninitialized(self)
1761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 def test_read_and_write(self):
1763 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001764 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001765
1766 self.assertEqual(b"as", rw.read(2))
1767 rw.write(b"ddd")
1768 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001769 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001771 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def test_seek_and_tell(self):
1774 raw = self.BytesIO(b"asdfghjkl")
1775 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001776
Ezio Melottib3aedd42010-11-20 19:04:17 +00001777 self.assertEqual(b"as", rw.read(2))
1778 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001779 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001780 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001781
Antoine Pitroue05565e2011-08-20 14:39:23 +02001782 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001783 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001784 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001785 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001786 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001787 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001788 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001789 self.assertEqual(7, rw.tell())
1790 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001791 rw.flush()
1792 self.assertEqual(b"asdf123fl", raw.getvalue())
1793
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001794 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001795
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 def check_flush_and_read(self, read_func):
1797 raw = self.BytesIO(b"abcdefghi")
1798 bufio = self.tp(raw)
1799
Ezio Melottib3aedd42010-11-20 19:04:17 +00001800 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001802 self.assertEqual(b"ef", read_func(bufio, 2))
1803 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001805 self.assertEqual(6, bufio.tell())
1806 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 raw.seek(0, 0)
1808 raw.write(b"XYZ")
1809 # flush() resets the read buffer
1810 bufio.flush()
1811 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001812 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001813
1814 def test_flush_and_read(self):
1815 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1816
1817 def test_flush_and_readinto(self):
1818 def _readinto(bufio, n=-1):
1819 b = bytearray(n if n >= 0 else 9999)
1820 n = bufio.readinto(b)
1821 return bytes(b[:n])
1822 self.check_flush_and_read(_readinto)
1823
1824 def test_flush_and_peek(self):
1825 def _peek(bufio, n=-1):
1826 # This relies on the fact that the buffer can contain the whole
1827 # raw stream, otherwise peek() can return less.
1828 b = bufio.peek(n)
1829 if n != -1:
1830 b = b[:n]
1831 bufio.seek(len(b), 1)
1832 return b
1833 self.check_flush_and_read(_peek)
1834
1835 def test_flush_and_write(self):
1836 raw = self.BytesIO(b"abcdefghi")
1837 bufio = self.tp(raw)
1838
1839 bufio.write(b"123")
1840 bufio.flush()
1841 bufio.write(b"45")
1842 bufio.flush()
1843 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001844 self.assertEqual(b"12345fghi", raw.getvalue())
1845 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001846
1847 def test_threads(self):
1848 BufferedReaderTest.test_threads(self)
1849 BufferedWriterTest.test_threads(self)
1850
1851 def test_writes_and_peek(self):
1852 def _peek(bufio):
1853 bufio.peek(1)
1854 self.check_writes(_peek)
1855 def _peek(bufio):
1856 pos = bufio.tell()
1857 bufio.seek(-1, 1)
1858 bufio.peek(1)
1859 bufio.seek(pos, 0)
1860 self.check_writes(_peek)
1861
1862 def test_writes_and_reads(self):
1863 def _read(bufio):
1864 bufio.seek(-1, 1)
1865 bufio.read(1)
1866 self.check_writes(_read)
1867
1868 def test_writes_and_read1s(self):
1869 def _read1(bufio):
1870 bufio.seek(-1, 1)
1871 bufio.read1(1)
1872 self.check_writes(_read1)
1873
1874 def test_writes_and_readintos(self):
1875 def _read(bufio):
1876 bufio.seek(-1, 1)
1877 bufio.readinto(bytearray(1))
1878 self.check_writes(_read)
1879
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001880 def test_write_after_readahead(self):
1881 # Issue #6629: writing after the buffer was filled by readahead should
1882 # first rewind the raw stream.
1883 for overwrite_size in [1, 5]:
1884 raw = self.BytesIO(b"A" * 10)
1885 bufio = self.tp(raw, 4)
1886 # Trigger readahead
1887 self.assertEqual(bufio.read(1), b"A")
1888 self.assertEqual(bufio.tell(), 1)
1889 # Overwriting should rewind the raw stream if it needs so
1890 bufio.write(b"B" * overwrite_size)
1891 self.assertEqual(bufio.tell(), overwrite_size + 1)
1892 # If the write size was smaller than the buffer size, flush() and
1893 # check that rewind happens.
1894 bufio.flush()
1895 self.assertEqual(bufio.tell(), overwrite_size + 1)
1896 s = raw.getvalue()
1897 self.assertEqual(s,
1898 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1899
Antoine Pitrou7c404892011-05-13 00:13:33 +02001900 def test_write_rewind_write(self):
1901 # Various combinations of reading / writing / seeking backwards / writing again
1902 def mutate(bufio, pos1, pos2):
1903 assert pos2 >= pos1
1904 # Fill the buffer
1905 bufio.seek(pos1)
1906 bufio.read(pos2 - pos1)
1907 bufio.write(b'\x02')
1908 # This writes earlier than the previous write, but still inside
1909 # the buffer.
1910 bufio.seek(pos1)
1911 bufio.write(b'\x01')
1912
1913 b = b"\x80\x81\x82\x83\x84"
1914 for i in range(0, len(b)):
1915 for j in range(i, len(b)):
1916 raw = self.BytesIO(b)
1917 bufio = self.tp(raw, 100)
1918 mutate(bufio, i, j)
1919 bufio.flush()
1920 expected = bytearray(b)
1921 expected[j] = 2
1922 expected[i] = 1
1923 self.assertEqual(raw.getvalue(), expected,
1924 "failed result for i=%d, j=%d" % (i, j))
1925
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001926 def test_truncate_after_read_or_write(self):
1927 raw = self.BytesIO(b"A" * 10)
1928 bufio = self.tp(raw, 100)
1929 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1930 self.assertEqual(bufio.truncate(), 2)
1931 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1932 self.assertEqual(bufio.truncate(), 4)
1933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 def test_misbehaved_io(self):
1935 BufferedReaderTest.test_misbehaved_io(self)
1936 BufferedWriterTest.test_misbehaved_io(self)
1937
Antoine Pitroue05565e2011-08-20 14:39:23 +02001938 def test_interleaved_read_write(self):
1939 # Test for issue #12213
1940 with self.BytesIO(b'abcdefgh') as raw:
1941 with self.tp(raw, 100) as f:
1942 f.write(b"1")
1943 self.assertEqual(f.read(1), b'b')
1944 f.write(b'2')
1945 self.assertEqual(f.read1(1), b'd')
1946 f.write(b'3')
1947 buf = bytearray(1)
1948 f.readinto(buf)
1949 self.assertEqual(buf, b'f')
1950 f.write(b'4')
1951 self.assertEqual(f.peek(1), b'h')
1952 f.flush()
1953 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1954
1955 with self.BytesIO(b'abc') as raw:
1956 with self.tp(raw, 100) as f:
1957 self.assertEqual(f.read(1), b'a')
1958 f.write(b"2")
1959 self.assertEqual(f.read(1), b'c')
1960 f.flush()
1961 self.assertEqual(raw.getvalue(), b'a2c')
1962
1963 def test_interleaved_readline_write(self):
1964 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1965 with self.tp(raw) as f:
1966 f.write(b'1')
1967 self.assertEqual(f.readline(), b'b\n')
1968 f.write(b'2')
1969 self.assertEqual(f.readline(), b'def\n')
1970 f.write(b'3')
1971 self.assertEqual(f.readline(), b'\n')
1972 f.flush()
1973 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1974
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001975 # You can't construct a BufferedRandom over a non-seekable stream.
1976 test_unseekable = None
1977
R David Murray67bfe802013-02-23 21:51:05 -05001978
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001979class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 tp = io.BufferedRandom
1981
1982 def test_constructor(self):
1983 BufferedRandomTest.test_constructor(self)
1984 # The allocation can succeed on 32-bit builds, e.g. with more
1985 # than 2GB RAM and a 64-bit kernel.
1986 if sys.maxsize > 0x7FFFFFFF:
1987 rawio = self.MockRawIO()
1988 bufio = self.tp(rawio)
1989 self.assertRaises((OverflowError, MemoryError, ValueError),
1990 bufio.__init__, rawio, sys.maxsize)
1991
1992 def test_garbage_collection(self):
1993 CBufferedReaderTest.test_garbage_collection(self)
1994 CBufferedWriterTest.test_garbage_collection(self)
1995
R David Murray67bfe802013-02-23 21:51:05 -05001996 def test_args_error(self):
1997 # Issue #17275
1998 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1999 self.tp(io.BytesIO(), 1024, 1024, 1024)
2000
2001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002class PyBufferedRandomTest(BufferedRandomTest):
2003 tp = pyio.BufferedRandom
2004
2005
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002006# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2007# properties:
2008# - A single output character can correspond to many bytes of input.
2009# - The number of input bytes to complete the character can be
2010# undetermined until the last input byte is received.
2011# - The number of input bytes can vary depending on previous input.
2012# - A single input byte can correspond to many characters of output.
2013# - The number of output characters can be undetermined until the
2014# last input byte is received.
2015# - The number of output characters can vary depending on previous input.
2016
2017class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2018 """
2019 For testing seek/tell behavior with a stateful, buffering decoder.
2020
2021 Input is a sequence of words. Words may be fixed-length (length set
2022 by input) or variable-length (period-terminated). In variable-length
2023 mode, extra periods are ignored. Possible words are:
2024 - 'i' followed by a number sets the input length, I (maximum 99).
2025 When I is set to 0, words are space-terminated.
2026 - 'o' followed by a number sets the output length, O (maximum 99).
2027 - Any other word is converted into a word followed by a period on
2028 the output. The output word consists of the input word truncated
2029 or padded out with hyphens to make its length equal to O. If O
2030 is 0, the word is output verbatim without truncating or padding.
2031 I and O are initially set to 1. When I changes, any buffered input is
2032 re-scanned according to the new I. EOF also terminates the last word.
2033 """
2034
2035 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002036 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002037 self.reset()
2038
2039 def __repr__(self):
2040 return '<SID %x>' % id(self)
2041
2042 def reset(self):
2043 self.i = 1
2044 self.o = 1
2045 self.buffer = bytearray()
2046
2047 def getstate(self):
2048 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2049 return bytes(self.buffer), i*100 + o
2050
2051 def setstate(self, state):
2052 buffer, io = state
2053 self.buffer = bytearray(buffer)
2054 i, o = divmod(io, 100)
2055 self.i, self.o = i ^ 1, o ^ 1
2056
2057 def decode(self, input, final=False):
2058 output = ''
2059 for b in input:
2060 if self.i == 0: # variable-length, terminated with period
2061 if b == ord('.'):
2062 if self.buffer:
2063 output += self.process_word()
2064 else:
2065 self.buffer.append(b)
2066 else: # fixed-length, terminate after self.i bytes
2067 self.buffer.append(b)
2068 if len(self.buffer) == self.i:
2069 output += self.process_word()
2070 if final and self.buffer: # EOF terminates the last word
2071 output += self.process_word()
2072 return output
2073
2074 def process_word(self):
2075 output = ''
2076 if self.buffer[0] == ord('i'):
2077 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2078 elif self.buffer[0] == ord('o'):
2079 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2080 else:
2081 output = self.buffer.decode('ascii')
2082 if len(output) < self.o:
2083 output += '-'*self.o # pad out with hyphens
2084 if self.o:
2085 output = output[:self.o] # truncate to output length
2086 output += '.'
2087 self.buffer = bytearray()
2088 return output
2089
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002090 codecEnabled = False
2091
2092 @classmethod
2093 def lookupTestDecoder(cls, name):
2094 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002095 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002096 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002097 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002098 incrementalencoder=None,
2099 streamreader=None, streamwriter=None,
2100 incrementaldecoder=cls)
2101
2102# Register the previous decoder for testing.
2103# Disabled by default, tests will enable it.
2104codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2105
2106
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002107class StatefulIncrementalDecoderTest(unittest.TestCase):
2108 """
2109 Make sure the StatefulIncrementalDecoder actually works.
2110 """
2111
2112 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002113 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002114 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002115 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002116 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002117 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002118 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002119 # I=0, O=6 (variable-length input, fixed-length output)
2120 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2121 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002122 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002123 # I=6, O=3 (fixed-length input > fixed-length output)
2124 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2125 # I=0, then 3; O=29, then 15 (with longer output)
2126 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2127 'a----------------------------.' +
2128 'b----------------------------.' +
2129 'cde--------------------------.' +
2130 'abcdefghijabcde.' +
2131 'a.b------------.' +
2132 '.c.------------.' +
2133 'd.e------------.' +
2134 'k--------------.' +
2135 'l--------------.' +
2136 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002137 ]
2138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002140 # Try a few one-shot test cases.
2141 for input, eof, output in self.test_cases:
2142 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002143 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002144
2145 # Also test an unfinished decode, followed by forcing EOF.
2146 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002147 self.assertEqual(d.decode(b'oiabcd'), '')
2148 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002149
2150class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002151
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002152 def setUp(self):
2153 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2154 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002155 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002156
Guido van Rossumd0712812007-04-11 16:32:43 +00002157 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002158 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160 def test_constructor(self):
2161 r = self.BytesIO(b"\xc3\xa9\n\n")
2162 b = self.BufferedReader(r, 1000)
2163 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002164 t.__init__(b, encoding="latin-1", newline="\r\n")
2165 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002167 t.__init__(b, encoding="utf-8", line_buffering=True)
2168 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(t.line_buffering, True)
2170 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 self.assertRaises(TypeError, t.__init__, b, newline=42)
2172 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2173
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002174 def test_uninitialized(self):
2175 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2176 del t
2177 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2178 self.assertRaises(Exception, repr, t)
2179 self.assertRaisesRegex((ValueError, AttributeError),
2180 'uninitialized|has no attribute',
2181 t.read, 0)
2182 t.__init__(self.MockRawIO())
2183 self.assertEqual(t.read(0), '')
2184
Nick Coghlana9b15242014-02-04 22:11:18 +10002185 def test_non_text_encoding_codecs_are_rejected(self):
2186 # Ensure the constructor complains if passed a codec that isn't
2187 # marked as a text encoding
2188 # http://bugs.python.org/issue20404
2189 r = self.BytesIO()
2190 b = self.BufferedWriter(r)
2191 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2192 self.TextIOWrapper(b, encoding="hex")
2193
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002194 def test_detach(self):
2195 r = self.BytesIO()
2196 b = self.BufferedWriter(r)
2197 t = self.TextIOWrapper(b)
2198 self.assertIs(t.detach(), b)
2199
2200 t = self.TextIOWrapper(b, encoding="ascii")
2201 t.write("howdy")
2202 self.assertFalse(r.getvalue())
2203 t.detach()
2204 self.assertEqual(r.getvalue(), b"howdy")
2205 self.assertRaises(ValueError, t.detach)
2206
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002207 # Operations independent of the detached stream should still work
2208 repr(t)
2209 self.assertEqual(t.encoding, "ascii")
2210 self.assertEqual(t.errors, "strict")
2211 self.assertFalse(t.line_buffering)
2212
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002213 def test_repr(self):
2214 raw = self.BytesIO("hello".encode("utf-8"))
2215 b = self.BufferedReader(raw)
2216 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002217 modname = self.TextIOWrapper.__module__
2218 self.assertEqual(repr(t),
2219 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2220 raw.name = "dummy"
2221 self.assertEqual(repr(t),
2222 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002223 t.mode = "r"
2224 self.assertEqual(repr(t),
2225 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002226 raw.name = b"dummy"
2227 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002228 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002229
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002230 t.buffer.detach()
2231 repr(t) # Should not raise an exception
2232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 def test_line_buffering(self):
2234 r = self.BytesIO()
2235 b = self.BufferedWriter(r, 1000)
2236 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002237 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002239 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002240 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002241 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002242 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002243
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002244 def test_default_encoding(self):
2245 old_environ = dict(os.environ)
2246 try:
2247 # try to get a user preferred encoding different than the current
2248 # locale encoding to check that TextIOWrapper() uses the current
2249 # locale encoding and not the user preferred encoding
2250 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2251 if key in os.environ:
2252 del os.environ[key]
2253
2254 current_locale_encoding = locale.getpreferredencoding(False)
2255 b = self.BytesIO()
2256 t = self.TextIOWrapper(b)
2257 self.assertEqual(t.encoding, current_locale_encoding)
2258 finally:
2259 os.environ.clear()
2260 os.environ.update(old_environ)
2261
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002262 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002263 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002264 # Issue 15989
2265 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002266 b = self.BytesIO()
2267 b.fileno = lambda: _testcapi.INT_MAX + 1
2268 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2269 b.fileno = lambda: _testcapi.UINT_MAX + 1
2270 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 def test_encoding(self):
2273 # Check the encoding attribute is always set, and valid
2274 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002275 t = self.TextIOWrapper(b, encoding="utf-8")
2276 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002278 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 codecs.lookup(t.encoding)
2280
2281 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002282 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002283 b = self.BytesIO(b"abc\n\xff\n")
2284 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002285 self.assertRaises(UnicodeError, t.read)
2286 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 b = self.BytesIO(b"abc\n\xff\n")
2288 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002289 self.assertRaises(UnicodeError, t.read)
2290 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291 b = self.BytesIO(b"abc\n\xff\n")
2292 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002293 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002294 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 b = self.BytesIO(b"abc\n\xff\n")
2296 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002297 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002298
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002300 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 b = self.BytesIO()
2302 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002303 self.assertRaises(UnicodeError, t.write, "\xff")
2304 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 b = self.BytesIO()
2306 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002307 self.assertRaises(UnicodeError, t.write, "\xff")
2308 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002309 b = self.BytesIO()
2310 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002311 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002312 t.write("abc\xffdef\n")
2313 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002314 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002315 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002316 b = self.BytesIO()
2317 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002318 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002319 t.write("abc\xffdef\n")
2320 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002321 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002322
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002324 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2325
2326 tests = [
2327 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002328 [ '', input_lines ],
2329 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2330 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2331 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002332 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002333 encodings = (
2334 'utf-8', 'latin-1',
2335 'utf-16', 'utf-16-le', 'utf-16-be',
2336 'utf-32', 'utf-32-le', 'utf-32-be',
2337 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002338
Guido van Rossum8358db22007-08-18 21:39:55 +00002339 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002340 # character in TextIOWrapper._pending_line.
2341 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002342 # XXX: str.encode() should return bytes
2343 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002344 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002345 for bufsize in range(1, 10):
2346 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2348 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002349 encoding=encoding)
2350 if do_reads:
2351 got_lines = []
2352 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002353 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002354 if c2 == '':
2355 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002356 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002357 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002358 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002359 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002360
2361 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002362 self.assertEqual(got_line, exp_line)
2363 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_newlines_input(self):
2366 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002367 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2368 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002369 (None, normalized.decode("ascii").splitlines(keepends=True)),
2370 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002371 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2372 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2373 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002374 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 buf = self.BytesIO(testdata)
2376 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002377 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002378 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002379 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002380
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381 def test_newlines_output(self):
2382 testdict = {
2383 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2384 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2385 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2386 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2387 }
2388 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2389 for newline, expected in tests:
2390 buf = self.BytesIO()
2391 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2392 txt.write("AAA\nB")
2393 txt.write("BB\nCCC\n")
2394 txt.write("X\rY\r\nZ")
2395 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002396 self.assertEqual(buf.closed, False)
2397 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002398
2399 def test_destructor(self):
2400 l = []
2401 base = self.BytesIO
2402 class MyBytesIO(base):
2403 def close(self):
2404 l.append(self.getvalue())
2405 base.close(self)
2406 b = MyBytesIO()
2407 t = self.TextIOWrapper(b, encoding="ascii")
2408 t.write("abc")
2409 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002410 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002411 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412
2413 def test_override_destructor(self):
2414 record = []
2415 class MyTextIO(self.TextIOWrapper):
2416 def __del__(self):
2417 record.append(1)
2418 try:
2419 f = super().__del__
2420 except AttributeError:
2421 pass
2422 else:
2423 f()
2424 def close(self):
2425 record.append(2)
2426 super().close()
2427 def flush(self):
2428 record.append(3)
2429 super().flush()
2430 b = self.BytesIO()
2431 t = MyTextIO(b, encoding="ascii")
2432 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002433 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 self.assertEqual(record, [1, 2, 3])
2435
2436 def test_error_through_destructor(self):
2437 # Test that the exception state is not modified by a destructor,
2438 # even if close() fails.
2439 rawio = self.CloseFailureIO()
2440 def f():
2441 self.TextIOWrapper(rawio).xyzzy
2442 with support.captured_output("stderr") as s:
2443 self.assertRaises(AttributeError, f)
2444 s = s.getvalue().strip()
2445 if s:
2446 # The destructor *may* have printed an unraisable error, check it
2447 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002448 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002449 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002450
Guido van Rossum9b76da62007-04-11 01:09:03 +00002451 # Systematic tests of the text I/O API
2452
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002453 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002454 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002455 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002456 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002457 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002459 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002461 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002462 self.assertEqual(f.tell(), 0)
2463 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002464 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002465 self.assertEqual(f.seek(0), 0)
2466 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002467 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(f.read(2), "ab")
2469 self.assertEqual(f.read(1), "c")
2470 self.assertEqual(f.read(1), "")
2471 self.assertEqual(f.read(), "")
2472 self.assertEqual(f.tell(), cookie)
2473 self.assertEqual(f.seek(0), 0)
2474 self.assertEqual(f.seek(0, 2), cookie)
2475 self.assertEqual(f.write("def"), 3)
2476 self.assertEqual(f.seek(cookie), cookie)
2477 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002478 if enc.startswith("utf"):
2479 self.multi_line_test(f, enc)
2480 f.close()
2481
2482 def multi_line_test(self, f, enc):
2483 f.seek(0)
2484 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002485 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002486 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002487 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002488 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002489 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002490 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002491 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002492 wlines.append((f.tell(), line))
2493 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002494 f.seek(0)
2495 rlines = []
2496 while True:
2497 pos = f.tell()
2498 line = f.readline()
2499 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002500 break
2501 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002505 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002506 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002507 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002508 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002509 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002510 p2 = f.tell()
2511 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002512 self.assertEqual(f.tell(), p0)
2513 self.assertEqual(f.readline(), "\xff\n")
2514 self.assertEqual(f.tell(), p1)
2515 self.assertEqual(f.readline(), "\xff\n")
2516 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002517 f.seek(0)
2518 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002519 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002520 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002521 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002522 f.close()
2523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002524 def test_seeking(self):
2525 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002526 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002527 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002528 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002529 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002530 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002531 suffix = bytes(u_suffix.encode("utf-8"))
2532 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002533 with self.open(support.TESTFN, "wb") as f:
2534 f.write(line*2)
2535 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2536 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002537 self.assertEqual(s, str(prefix, "ascii"))
2538 self.assertEqual(f.tell(), prefix_size)
2539 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002542 # Regression test for a specific bug
2543 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002544 with self.open(support.TESTFN, "wb") as f:
2545 f.write(data)
2546 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2547 f._CHUNK_SIZE # Just test that it exists
2548 f._CHUNK_SIZE = 2
2549 f.readline()
2550 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 def test_seek_and_tell(self):
2553 #Test seek/tell using the StatefulIncrementalDecoder.
2554 # Make test faster by doing smaller seeks
2555 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002556
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002557 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002558 """Tell/seek to various points within a data stream and ensure
2559 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002560 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002561 f.write(data)
2562 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 f = self.open(support.TESTFN, encoding='test_decoder')
2564 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002565 decoded = f.read()
2566 f.close()
2567
Neal Norwitze2b07052008-03-18 19:52:05 +00002568 for i in range(min_pos, len(decoded) + 1): # seek positions
2569 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002572 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002573 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002574 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002575 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002576 f.close()
2577
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002578 # Enable the test decoder.
2579 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002580
2581 # Run the tests.
2582 try:
2583 # Try each test case.
2584 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002585 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002586
2587 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002588 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2589 offset = CHUNK_SIZE - len(input)//2
2590 prefix = b'.'*offset
2591 # Don't bother seeking into the prefix (takes too long).
2592 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002593 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002594
2595 # Ensure our test decoder won't interfere with subsequent tests.
2596 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002597 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002598
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002599 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002600 data = "1234567890"
2601 tests = ("utf-16",
2602 "utf-16-le",
2603 "utf-16-be",
2604 "utf-32",
2605 "utf-32-le",
2606 "utf-32-be")
2607 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002608 buf = self.BytesIO()
2609 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002610 # Check if the BOM is written only once (see issue1753).
2611 f.write(data)
2612 f.write(data)
2613 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002614 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002615 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(f.read(), data * 2)
2617 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002618
Benjamin Petersona1b49012009-03-31 23:11:32 +00002619 def test_unreadable(self):
2620 class UnReadable(self.BytesIO):
2621 def readable(self):
2622 return False
2623 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002624 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626 def test_read_one_by_one(self):
2627 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002628 reads = ""
2629 while True:
2630 c = txt.read(1)
2631 if not c:
2632 break
2633 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002634 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002635
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002636 def test_readlines(self):
2637 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2638 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2639 txt.seek(0)
2640 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2641 txt.seek(0)
2642 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2643
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002644 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002646 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002647 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002648 reads = ""
2649 while True:
2650 c = txt.read(128)
2651 if not c:
2652 break
2653 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002654 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002655
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002656 def test_writelines(self):
2657 l = ['ab', 'cd', 'ef']
2658 buf = self.BytesIO()
2659 txt = self.TextIOWrapper(buf)
2660 txt.writelines(l)
2661 txt.flush()
2662 self.assertEqual(buf.getvalue(), b'abcdef')
2663
2664 def test_writelines_userlist(self):
2665 l = UserList(['ab', 'cd', 'ef'])
2666 buf = self.BytesIO()
2667 txt = self.TextIOWrapper(buf)
2668 txt.writelines(l)
2669 txt.flush()
2670 self.assertEqual(buf.getvalue(), b'abcdef')
2671
2672 def test_writelines_error(self):
2673 txt = self.TextIOWrapper(self.BytesIO())
2674 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2675 self.assertRaises(TypeError, txt.writelines, None)
2676 self.assertRaises(TypeError, txt.writelines, b'abc')
2677
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002678 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002679 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002680
2681 # read one char at a time
2682 reads = ""
2683 while True:
2684 c = txt.read(1)
2685 if not c:
2686 break
2687 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002688 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002689
2690 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002691 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002692 txt._CHUNK_SIZE = 4
2693
2694 reads = ""
2695 while True:
2696 c = txt.read(4)
2697 if not c:
2698 break
2699 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002700 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002701
2702 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002703 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002704 txt._CHUNK_SIZE = 4
2705
2706 reads = txt.read(4)
2707 reads += txt.read(4)
2708 reads += txt.readline()
2709 reads += txt.readline()
2710 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002711 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002712
2713 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002715 txt._CHUNK_SIZE = 4
2716
2717 reads = txt.read(4)
2718 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002719 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002720
2721 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002723 txt._CHUNK_SIZE = 4
2724
2725 reads = txt.read(4)
2726 pos = txt.tell()
2727 txt.seek(0)
2728 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002729 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002730
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002731 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732 buffer = self.BytesIO(self.testdata)
2733 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002734
2735 self.assertEqual(buffer.seekable(), txt.seekable())
2736
Antoine Pitroue4501852009-05-14 18:55:55 +00002737 def test_append_bom(self):
2738 # The BOM is not written again when appending to a non-empty file
2739 filename = support.TESTFN
2740 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2741 with self.open(filename, 'w', encoding=charset) as f:
2742 f.write('aaa')
2743 pos = f.tell()
2744 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002745 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002746
2747 with self.open(filename, 'a', encoding=charset) as f:
2748 f.write('xxx')
2749 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002750 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002751
2752 def test_seek_bom(self):
2753 # Same test, but when seeking manually
2754 filename = support.TESTFN
2755 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2756 with self.open(filename, 'w', encoding=charset) as f:
2757 f.write('aaa')
2758 pos = f.tell()
2759 with self.open(filename, 'r+', encoding=charset) as f:
2760 f.seek(pos)
2761 f.write('zzz')
2762 f.seek(0)
2763 f.write('bbb')
2764 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002765 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002766
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002767 def test_seek_append_bom(self):
2768 # Same test, but first seek to the start and then to the end
2769 filename = support.TESTFN
2770 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2771 with self.open(filename, 'w', encoding=charset) as f:
2772 f.write('aaa')
2773 with self.open(filename, 'a', encoding=charset) as f:
2774 f.seek(0)
2775 f.seek(0, self.SEEK_END)
2776 f.write('xxx')
2777 with self.open(filename, 'rb') as f:
2778 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2779
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002780 def test_errors_property(self):
2781 with self.open(support.TESTFN, "w") as f:
2782 self.assertEqual(f.errors, "strict")
2783 with self.open(support.TESTFN, "w", errors="replace") as f:
2784 self.assertEqual(f.errors, "replace")
2785
Brett Cannon31f59292011-02-21 19:29:56 +00002786 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002787 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002788 def test_threads_write(self):
2789 # Issue6750: concurrent writes could duplicate data
2790 event = threading.Event()
2791 with self.open(support.TESTFN, "w", buffering=1) as f:
2792 def run(n):
2793 text = "Thread%03d\n" % n
2794 event.wait()
2795 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002796 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002797 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002798 with support.start_threads(threads, event.set):
2799 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002800 with self.open(support.TESTFN) as f:
2801 content = f.read()
2802 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002803 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002804
Antoine Pitrou6be88762010-05-03 16:48:20 +00002805 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002806 # Test that text file is closed despite failed flush
2807 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002808 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002809 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002810 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002811 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002812 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002813 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002814 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002815 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002816 self.assertTrue(txt.buffer.closed)
2817 self.assertTrue(closed) # flush() called
2818 self.assertFalse(closed[0]) # flush() called before file closed
2819 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002820 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002821
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002822 def test_close_error_on_close(self):
2823 buffer = self.BytesIO(self.testdata)
2824 def bad_flush():
2825 raise OSError('flush')
2826 def bad_close():
2827 raise OSError('close')
2828 buffer.close = bad_close
2829 txt = self.TextIOWrapper(buffer, encoding="ascii")
2830 txt.flush = bad_flush
2831 with self.assertRaises(OSError) as err: # exception not swallowed
2832 txt.close()
2833 self.assertEqual(err.exception.args, ('close',))
2834 self.assertIsInstance(err.exception.__context__, OSError)
2835 self.assertEqual(err.exception.__context__.args, ('flush',))
2836 self.assertFalse(txt.closed)
2837
2838 def test_nonnormalized_close_error_on_close(self):
2839 # Issue #21677
2840 buffer = self.BytesIO(self.testdata)
2841 def bad_flush():
2842 raise non_existing_flush
2843 def bad_close():
2844 raise non_existing_close
2845 buffer.close = bad_close
2846 txt = self.TextIOWrapper(buffer, encoding="ascii")
2847 txt.flush = bad_flush
2848 with self.assertRaises(NameError) as err: # exception not swallowed
2849 txt.close()
2850 self.assertIn('non_existing_close', str(err.exception))
2851 self.assertIsInstance(err.exception.__context__, NameError)
2852 self.assertIn('non_existing_flush', str(err.exception.__context__))
2853 self.assertFalse(txt.closed)
2854
Antoine Pitrou6be88762010-05-03 16:48:20 +00002855 def test_multi_close(self):
2856 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2857 txt.close()
2858 txt.close()
2859 txt.close()
2860 self.assertRaises(ValueError, txt.flush)
2861
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002862 def test_unseekable(self):
2863 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2864 self.assertRaises(self.UnsupportedOperation, txt.tell)
2865 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2866
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002867 def test_readonly_attributes(self):
2868 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2869 buf = self.BytesIO(self.testdata)
2870 with self.assertRaises(AttributeError):
2871 txt.buffer = buf
2872
Antoine Pitroue96ec682011-07-23 21:46:35 +02002873 def test_rawio(self):
2874 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2875 # that subprocess.Popen() can have the required unbuffered
2876 # semantics with universal_newlines=True.
2877 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2878 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2879 # Reads
2880 self.assertEqual(txt.read(4), 'abcd')
2881 self.assertEqual(txt.readline(), 'efghi\n')
2882 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2883
2884 def test_rawio_write_through(self):
2885 # Issue #12591: with write_through=True, writes don't need a flush
2886 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2887 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2888 write_through=True)
2889 txt.write('1')
2890 txt.write('23\n4')
2891 txt.write('5')
2892 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2893
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002894 def test_bufio_write_through(self):
2895 # Issue #21396: write_through=True doesn't force a flush()
2896 # on the underlying binary buffered object.
2897 flush_called, write_called = [], []
2898 class BufferedWriter(self.BufferedWriter):
2899 def flush(self, *args, **kwargs):
2900 flush_called.append(True)
2901 return super().flush(*args, **kwargs)
2902 def write(self, *args, **kwargs):
2903 write_called.append(True)
2904 return super().write(*args, **kwargs)
2905
2906 rawio = self.BytesIO()
2907 data = b"a"
2908 bufio = BufferedWriter(rawio, len(data)*2)
2909 textio = self.TextIOWrapper(bufio, encoding='ascii',
2910 write_through=True)
2911 # write to the buffered io but don't overflow the buffer
2912 text = data.decode('ascii')
2913 textio.write(text)
2914
2915 # buffer.flush is not called with write_through=True
2916 self.assertFalse(flush_called)
2917 # buffer.write *is* called with write_through=True
2918 self.assertTrue(write_called)
2919 self.assertEqual(rawio.getvalue(), b"") # no flush
2920
2921 write_called = [] # reset
2922 textio.write(text * 10) # total content is larger than bufio buffer
2923 self.assertTrue(write_called)
2924 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2925
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002926 def test_read_nonbytes(self):
2927 # Issue #17106
2928 # Crash when underlying read() returns non-bytes
2929 t = self.TextIOWrapper(self.StringIO('a'))
2930 self.assertRaises(TypeError, t.read, 1)
2931 t = self.TextIOWrapper(self.StringIO('a'))
2932 self.assertRaises(TypeError, t.readline)
2933 t = self.TextIOWrapper(self.StringIO('a'))
2934 self.assertRaises(TypeError, t.read)
2935
2936 def test_illegal_decoder(self):
2937 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002938 # Bypass the early encoding check added in issue 20404
2939 def _make_illegal_wrapper():
2940 quopri = codecs.lookup("quopri")
2941 quopri._is_text_encoding = True
2942 try:
2943 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2944 newline='\n', encoding="quopri")
2945 finally:
2946 quopri._is_text_encoding = False
2947 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002948 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002949 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002950 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002951 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002952 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002953 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002954 self.assertRaises(TypeError, t.read)
2955
Antoine Pitrou712cb732013-12-21 15:51:54 +01002956 def _check_create_at_shutdown(self, **kwargs):
2957 # Issue #20037: creating a TextIOWrapper at shutdown
2958 # shouldn't crash the interpreter.
2959 iomod = self.io.__name__
2960 code = """if 1:
2961 import codecs
2962 import {iomod} as io
2963
2964 # Avoid looking up codecs at shutdown
2965 codecs.lookup('utf-8')
2966
2967 class C:
2968 def __init__(self):
2969 self.buf = io.BytesIO()
2970 def __del__(self):
2971 io.TextIOWrapper(self.buf, **{kwargs})
2972 print("ok")
2973 c = C()
2974 """.format(iomod=iomod, kwargs=kwargs)
2975 return assert_python_ok("-c", code)
2976
2977 def test_create_at_shutdown_without_encoding(self):
2978 rc, out, err = self._check_create_at_shutdown()
2979 if err:
2980 # Can error out with a RuntimeError if the module state
2981 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002982 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002983 else:
2984 self.assertEqual("ok", out.decode().strip())
2985
2986 def test_create_at_shutdown_with_encoding(self):
2987 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2988 errors='strict')
2989 self.assertFalse(err)
2990 self.assertEqual("ok", out.decode().strip())
2991
Antoine Pitroub8503892014-04-29 10:14:02 +02002992 def test_read_byteslike(self):
2993 r = MemviewBytesIO(b'Just some random string\n')
2994 t = self.TextIOWrapper(r, 'utf-8')
2995
2996 # TextIOwrapper will not read the full string, because
2997 # we truncate it to a multiple of the native int size
2998 # so that we can construct a more complex memoryview.
2999 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3000
3001 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3002
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003003 def test_issue22849(self):
3004 class F(object):
3005 def readable(self): return True
3006 def writable(self): return True
3007 def seekable(self): return True
3008
3009 for i in range(10):
3010 try:
3011 self.TextIOWrapper(F(), encoding='utf-8')
3012 except Exception:
3013 pass
3014
3015 F.tell = lambda x: 0
3016 t = self.TextIOWrapper(F(), encoding='utf-8')
3017
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003018
Antoine Pitroub8503892014-04-29 10:14:02 +02003019class MemviewBytesIO(io.BytesIO):
3020 '''A BytesIO object whose read method returns memoryviews
3021 rather than bytes'''
3022
3023 def read1(self, len_):
3024 return _to_memoryview(super().read1(len_))
3025
3026 def read(self, len_):
3027 return _to_memoryview(super().read(len_))
3028
3029def _to_memoryview(buf):
3030 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3031
3032 arr = array.array('i')
3033 idx = len(buf) - len(buf) % arr.itemsize
3034 arr.frombytes(buf[:idx])
3035 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003036
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003037
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003038class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003039 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003040 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003041
3042 def test_initialization(self):
3043 r = self.BytesIO(b"\xc3\xa9\n\n")
3044 b = self.BufferedReader(r, 1000)
3045 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003046 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3047 self.assertRaises(ValueError, t.read)
3048
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003049 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3050 self.assertRaises(Exception, repr, t)
3051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003052 def test_garbage_collection(self):
3053 # C TextIOWrapper objects are collected, and collecting them flushes
3054 # all data to disk.
3055 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003056 with support.check_warnings(('', ResourceWarning)):
3057 rawio = io.FileIO(support.TESTFN, "wb")
3058 b = self.BufferedWriter(rawio)
3059 t = self.TextIOWrapper(b, encoding="ascii")
3060 t.write("456def")
3061 t.x = t
3062 wr = weakref.ref(t)
3063 del t
3064 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003065 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003066 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003067 self.assertEqual(f.read(), b"456def")
3068
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003069 def test_rwpair_cleared_before_textio(self):
3070 # Issue 13070: TextIOWrapper's finalization would crash when called
3071 # after the reference to the underlying BufferedRWPair's writer got
3072 # cleared by the GC.
3073 for i in range(1000):
3074 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3075 t1 = self.TextIOWrapper(b1, encoding="ascii")
3076 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3077 t2 = self.TextIOWrapper(b2, encoding="ascii")
3078 # circular references
3079 t1.buddy = t2
3080 t2.buddy = t1
3081 support.gc_collect()
3082
3083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003085 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003086 #shutdown_error = "LookupError: unknown encoding: ascii"
3087 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003088
3089
3090class IncrementalNewlineDecoderTest(unittest.TestCase):
3091
3092 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003093 # UTF-8 specific tests for a newline decoder
3094 def _check_decode(b, s, **kwargs):
3095 # We exercise getstate() / setstate() as well as decode()
3096 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003097 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003098 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003099 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003100
Antoine Pitrou180a3362008-12-14 16:36:46 +00003101 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003102
Antoine Pitrou180a3362008-12-14 16:36:46 +00003103 _check_decode(b'\xe8', "")
3104 _check_decode(b'\xa2', "")
3105 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003106
Antoine Pitrou180a3362008-12-14 16:36:46 +00003107 _check_decode(b'\xe8', "")
3108 _check_decode(b'\xa2', "")
3109 _check_decode(b'\x88', "\u8888")
3110
3111 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003112 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3113
Antoine Pitrou180a3362008-12-14 16:36:46 +00003114 decoder.reset()
3115 _check_decode(b'\n', "\n")
3116 _check_decode(b'\r', "")
3117 _check_decode(b'', "\n", final=True)
3118 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003119
Antoine Pitrou180a3362008-12-14 16:36:46 +00003120 _check_decode(b'\r', "")
3121 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003122
Antoine Pitrou180a3362008-12-14 16:36:46 +00003123 _check_decode(b'\r\r\n', "\n\n")
3124 _check_decode(b'\r', "")
3125 _check_decode(b'\r', "\n")
3126 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003127
Antoine Pitrou180a3362008-12-14 16:36:46 +00003128 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3129 _check_decode(b'\xe8\xa2\x88', "\u8888")
3130 _check_decode(b'\n', "\n")
3131 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3132 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003134 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003135 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003136 if encoding is not None:
3137 encoder = codecs.getincrementalencoder(encoding)()
3138 def _decode_bytewise(s):
3139 # Decode one byte at a time
3140 for b in encoder.encode(s):
3141 result.append(decoder.decode(bytes([b])))
3142 else:
3143 encoder = None
3144 def _decode_bytewise(s):
3145 # Decode one char at a time
3146 for c in s:
3147 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003148 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003149 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003150 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003151 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003152 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003153 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003154 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003155 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003156 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003157 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003158 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003159 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003160 input = "abc"
3161 if encoder is not None:
3162 encoder.reset()
3163 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003164 self.assertEqual(decoder.decode(input), "abc")
3165 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003166
3167 def test_newline_decoder(self):
3168 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003169 # None meaning the IncrementalNewlineDecoder takes unicode input
3170 # rather than bytes input
3171 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003172 'utf-16', 'utf-16-le', 'utf-16-be',
3173 'utf-32', 'utf-32-le', 'utf-32-be',
3174 )
3175 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003176 decoder = enc and codecs.getincrementaldecoder(enc)()
3177 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3178 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003179 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003180 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3181 self.check_newline_decoding_utf8(decoder)
3182
Antoine Pitrou66913e22009-03-06 23:40:56 +00003183 def test_newline_bytes(self):
3184 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3185 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003186 self.assertEqual(dec.newlines, None)
3187 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3188 self.assertEqual(dec.newlines, None)
3189 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3190 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003191 dec = self.IncrementalNewlineDecoder(None, translate=False)
3192 _check(dec)
3193 dec = self.IncrementalNewlineDecoder(None, translate=True)
3194 _check(dec)
3195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003196class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3197 pass
3198
3199class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3200 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003201
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003202
Guido van Rossum01a27522007-03-07 01:00:12 +00003203# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003204
Guido van Rossum5abbf752007-08-27 17:39:33 +00003205class MiscIOTest(unittest.TestCase):
3206
Barry Warsaw40e82462008-11-20 20:14:50 +00003207 def tearDown(self):
3208 support.unlink(support.TESTFN)
3209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003210 def test___all__(self):
3211 for name in self.io.__all__:
3212 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003213 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003214 if name == "open":
3215 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003216 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003217 self.assertTrue(issubclass(obj, Exception), name)
3218 elif not name.startswith("SEEK_"):
3219 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003220
Barry Warsaw40e82462008-11-20 20:14:50 +00003221 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003222 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003223 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003224 f.close()
3225
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003226 with support.check_warnings(('', DeprecationWarning)):
3227 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003228 self.assertEqual(f.name, support.TESTFN)
3229 self.assertEqual(f.buffer.name, support.TESTFN)
3230 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3231 self.assertEqual(f.mode, "U")
3232 self.assertEqual(f.buffer.mode, "rb")
3233 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003234 f.close()
3235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003236 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003237 self.assertEqual(f.mode, "w+")
3238 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3239 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003241 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003242 self.assertEqual(g.mode, "wb")
3243 self.assertEqual(g.raw.mode, "wb")
3244 self.assertEqual(g.name, f.fileno())
3245 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003246 f.close()
3247 g.close()
3248
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003249 def test_io_after_close(self):
3250 for kwargs in [
3251 {"mode": "w"},
3252 {"mode": "wb"},
3253 {"mode": "w", "buffering": 1},
3254 {"mode": "w", "buffering": 2},
3255 {"mode": "wb", "buffering": 0},
3256 {"mode": "r"},
3257 {"mode": "rb"},
3258 {"mode": "r", "buffering": 1},
3259 {"mode": "r", "buffering": 2},
3260 {"mode": "rb", "buffering": 0},
3261 {"mode": "w+"},
3262 {"mode": "w+b"},
3263 {"mode": "w+", "buffering": 1},
3264 {"mode": "w+", "buffering": 2},
3265 {"mode": "w+b", "buffering": 0},
3266 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003267 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003268 f.close()
3269 self.assertRaises(ValueError, f.flush)
3270 self.assertRaises(ValueError, f.fileno)
3271 self.assertRaises(ValueError, f.isatty)
3272 self.assertRaises(ValueError, f.__iter__)
3273 if hasattr(f, "peek"):
3274 self.assertRaises(ValueError, f.peek, 1)
3275 self.assertRaises(ValueError, f.read)
3276 if hasattr(f, "read1"):
3277 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003278 if hasattr(f, "readall"):
3279 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003280 if hasattr(f, "readinto"):
3281 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003282 if hasattr(f, "readinto1"):
3283 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003284 self.assertRaises(ValueError, f.readline)
3285 self.assertRaises(ValueError, f.readlines)
3286 self.assertRaises(ValueError, f.seek, 0)
3287 self.assertRaises(ValueError, f.tell)
3288 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003289 self.assertRaises(ValueError, f.write,
3290 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003291 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003292 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003293
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003294 def test_blockingioerror(self):
3295 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003296 class C(str):
3297 pass
3298 c = C("")
3299 b = self.BlockingIOError(1, c)
3300 c.b = b
3301 b.c = c
3302 wr = weakref.ref(c)
3303 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003304 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003305 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003306
3307 def test_abcs(self):
3308 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003309 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3310 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3311 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3312 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003313
3314 def _check_abc_inheritance(self, abcmodule):
3315 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003316 self.assertIsInstance(f, abcmodule.IOBase)
3317 self.assertIsInstance(f, abcmodule.RawIOBase)
3318 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3319 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003320 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003321 self.assertIsInstance(f, abcmodule.IOBase)
3322 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3323 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3324 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003325 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003326 self.assertIsInstance(f, abcmodule.IOBase)
3327 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3328 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3329 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003330
3331 def test_abc_inheritance(self):
3332 # Test implementations inherit from their respective ABCs
3333 self._check_abc_inheritance(self)
3334
3335 def test_abc_inheritance_official(self):
3336 # Test implementations inherit from the official ABCs of the
3337 # baseline "io" module.
3338 self._check_abc_inheritance(io)
3339
Antoine Pitroue033e062010-10-29 10:38:18 +00003340 def _check_warn_on_dealloc(self, *args, **kwargs):
3341 f = open(*args, **kwargs)
3342 r = repr(f)
3343 with self.assertWarns(ResourceWarning) as cm:
3344 f = None
3345 support.gc_collect()
3346 self.assertIn(r, str(cm.warning.args[0]))
3347
3348 def test_warn_on_dealloc(self):
3349 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3350 self._check_warn_on_dealloc(support.TESTFN, "wb")
3351 self._check_warn_on_dealloc(support.TESTFN, "w")
3352
3353 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3354 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003355 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003356 for fd in fds:
3357 try:
3358 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003359 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003360 if e.errno != errno.EBADF:
3361 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003362 self.addCleanup(cleanup_fds)
3363 r, w = os.pipe()
3364 fds += r, w
3365 self._check_warn_on_dealloc(r, *args, **kwargs)
3366 # When using closefd=False, there's no warning
3367 r, w = os.pipe()
3368 fds += r, w
3369 with warnings.catch_warnings(record=True) as recorded:
3370 open(r, *args, closefd=False, **kwargs)
3371 support.gc_collect()
3372 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003373
3374 def test_warn_on_dealloc_fd(self):
3375 self._check_warn_on_dealloc_fd("rb", buffering=0)
3376 self._check_warn_on_dealloc_fd("rb")
3377 self._check_warn_on_dealloc_fd("r")
3378
3379
Antoine Pitrou243757e2010-11-05 21:15:39 +00003380 def test_pickling(self):
3381 # Pickling file objects is forbidden
3382 for kwargs in [
3383 {"mode": "w"},
3384 {"mode": "wb"},
3385 {"mode": "wb", "buffering": 0},
3386 {"mode": "r"},
3387 {"mode": "rb"},
3388 {"mode": "rb", "buffering": 0},
3389 {"mode": "w+"},
3390 {"mode": "w+b"},
3391 {"mode": "w+b", "buffering": 0},
3392 ]:
3393 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3394 with self.open(support.TESTFN, **kwargs) as f:
3395 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3396
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003397 def test_nonblock_pipe_write_bigbuf(self):
3398 self._test_nonblock_pipe_write(16*1024)
3399
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003400 def test_nonblock_pipe_write_smallbuf(self):
3401 self._test_nonblock_pipe_write(1024)
3402
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003403 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3404 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003405 def _test_nonblock_pipe_write(self, bufsize):
3406 sent = []
3407 received = []
3408 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003409 os.set_blocking(r, False)
3410 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003411
3412 # To exercise all code paths in the C implementation we need
3413 # to play with buffer sizes. For instance, if we choose a
3414 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3415 # then we will never get a partial write of the buffer.
3416 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3417 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3418
3419 with rf, wf:
3420 for N in 9999, 73, 7574:
3421 try:
3422 i = 0
3423 while True:
3424 msg = bytes([i % 26 + 97]) * N
3425 sent.append(msg)
3426 wf.write(msg)
3427 i += 1
3428
3429 except self.BlockingIOError as e:
3430 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003431 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003432 sent[-1] = sent[-1][:e.characters_written]
3433 received.append(rf.read())
3434 msg = b'BLOCKED'
3435 wf.write(msg)
3436 sent.append(msg)
3437
3438 while True:
3439 try:
3440 wf.flush()
3441 break
3442 except self.BlockingIOError as e:
3443 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003444 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003445 self.assertEqual(e.characters_written, 0)
3446 received.append(rf.read())
3447
3448 received += iter(rf.read, None)
3449
3450 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003451 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003452 self.assertTrue(wf.closed)
3453 self.assertTrue(rf.closed)
3454
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003455 def test_create_fail(self):
3456 # 'x' mode fails if file is existing
3457 with self.open(support.TESTFN, 'w'):
3458 pass
3459 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3460
3461 def test_create_writes(self):
3462 # 'x' mode opens for writing
3463 with self.open(support.TESTFN, 'xb') as f:
3464 f.write(b"spam")
3465 with self.open(support.TESTFN, 'rb') as f:
3466 self.assertEqual(b"spam", f.read())
3467
Christian Heimes7b648752012-09-10 14:48:43 +02003468 def test_open_allargs(self):
3469 # there used to be a buffer overflow in the parser for rawmode
3470 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3471
3472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003473class CMiscIOTest(MiscIOTest):
3474 io = io
3475
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003476 def test_readinto_buffer_overflow(self):
3477 # Issue #18025
3478 class BadReader(self.io.BufferedIOBase):
3479 def read(self, n=-1):
3480 return b'x' * 10**6
3481 bufio = BadReader()
3482 b = bytearray(2)
3483 self.assertRaises(ValueError, bufio.readinto, b)
3484
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003485 @unittest.skipUnless(threading, 'Threading required for this test.')
3486 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3487 # Issue #23309: deadlocks at shutdown should be avoided when a
3488 # daemon thread and the main thread both write to a file.
3489 code = """if 1:
3490 import sys
3491 import time
3492 import threading
3493
3494 file = sys.{stream_name}
3495
3496 def run():
3497 while True:
3498 file.write('.')
3499 file.flush()
3500
3501 thread = threading.Thread(target=run)
3502 thread.daemon = True
3503 thread.start()
3504
3505 time.sleep(0.5)
3506 file.write('!')
3507 file.flush()
3508 """.format_map(locals())
3509 res, _ = run_python_until_end("-c", code)
3510 err = res.err.decode()
3511 if res.rc != 0:
3512 # Failure: should be a fatal error
3513 self.assertIn("Fatal Python error: could not acquire lock "
3514 "for <_io.BufferedWriter name='<{stream_name}>'> "
3515 "at interpreter shutdown, possibly due to "
3516 "daemon threads".format_map(locals()),
3517 err)
3518 else:
3519 self.assertFalse(err.strip('.!'))
3520
3521 def test_daemon_threads_shutdown_stdout_deadlock(self):
3522 self.check_daemon_threads_shutdown_deadlock('stdout')
3523
3524 def test_daemon_threads_shutdown_stderr_deadlock(self):
3525 self.check_daemon_threads_shutdown_deadlock('stderr')
3526
3527
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003528class PyMiscIOTest(MiscIOTest):
3529 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003530
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003531
3532@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3533class SignalsTest(unittest.TestCase):
3534
3535 def setUp(self):
3536 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3537
3538 def tearDown(self):
3539 signal.signal(signal.SIGALRM, self.oldalrm)
3540
3541 def alarm_interrupt(self, sig, frame):
3542 1/0
3543
3544 @unittest.skipUnless(threading, 'Threading required for this test.')
3545 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3546 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003547 invokes the signal handler, and bubbles up the exception raised
3548 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003549 read_results = []
3550 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003551 if hasattr(signal, 'pthread_sigmask'):
3552 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003553 s = os.read(r, 1)
3554 read_results.append(s)
3555 t = threading.Thread(target=_read)
3556 t.daemon = True
3557 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003558 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003559 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003560 try:
3561 wio = self.io.open(w, **fdopen_kwargs)
3562 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003563 # Fill the pipe enough that the write will be blocking.
3564 # It will be interrupted by the timer armed above. Since the
3565 # other thread has read one byte, the low-level write will
3566 # return with a successful (partial) result rather than an EINTR.
3567 # The buffered IO layer must check for pending signal
3568 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003569 signal.alarm(1)
3570 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003571 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003572 finally:
3573 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003574 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003575 # We got one byte, get another one and check that it isn't a
3576 # repeat of the first one.
3577 read_results.append(os.read(r, 1))
3578 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3579 finally:
3580 os.close(w)
3581 os.close(r)
3582 # This is deliberate. If we didn't close the file descriptor
3583 # before closing wio, wio would try to flush its internal
3584 # buffer, and block again.
3585 try:
3586 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003587 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003588 if e.errno != errno.EBADF:
3589 raise
3590
3591 def test_interrupted_write_unbuffered(self):
3592 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3593
3594 def test_interrupted_write_buffered(self):
3595 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3596
Victor Stinner6ab72862014-09-03 23:32:28 +02003597 # Issue #22331: The test hangs on FreeBSD 7.2
3598 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003599 def test_interrupted_write_text(self):
3600 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3601
Brett Cannon31f59292011-02-21 19:29:56 +00003602 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003603 def check_reentrant_write(self, data, **fdopen_kwargs):
3604 def on_alarm(*args):
3605 # Will be called reentrantly from the same thread
3606 wio.write(data)
3607 1/0
3608 signal.signal(signal.SIGALRM, on_alarm)
3609 r, w = os.pipe()
3610 wio = self.io.open(w, **fdopen_kwargs)
3611 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003612 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003613 # Either the reentrant call to wio.write() fails with RuntimeError,
3614 # or the signal handler raises ZeroDivisionError.
3615 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3616 while 1:
3617 for i in range(100):
3618 wio.write(data)
3619 wio.flush()
3620 # Make sure the buffer doesn't fill up and block further writes
3621 os.read(r, len(data) * 100)
3622 exc = cm.exception
3623 if isinstance(exc, RuntimeError):
3624 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3625 finally:
3626 wio.close()
3627 os.close(r)
3628
3629 def test_reentrant_write_buffered(self):
3630 self.check_reentrant_write(b"xy", mode="wb")
3631
3632 def test_reentrant_write_text(self):
3633 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3634
Antoine Pitrou707ce822011-02-25 21:24:11 +00003635 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3636 """Check that a buffered read, when it gets interrupted (either
3637 returning a partial result or EINTR), properly invokes the signal
3638 handler and retries if the latter returned successfully."""
3639 r, w = os.pipe()
3640 fdopen_kwargs["closefd"] = False
3641 def alarm_handler(sig, frame):
3642 os.write(w, b"bar")
3643 signal.signal(signal.SIGALRM, alarm_handler)
3644 try:
3645 rio = self.io.open(r, **fdopen_kwargs)
3646 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003647 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003648 # Expected behaviour:
3649 # - first raw read() returns partial b"foo"
3650 # - second raw read() returns EINTR
3651 # - third raw read() returns b"bar"
3652 self.assertEqual(decode(rio.read(6)), "foobar")
3653 finally:
3654 rio.close()
3655 os.close(w)
3656 os.close(r)
3657
Antoine Pitrou20db5112011-08-19 20:32:34 +02003658 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003659 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3660 mode="rb")
3661
Antoine Pitrou20db5112011-08-19 20:32:34 +02003662 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003663 self.check_interrupted_read_retry(lambda x: x,
3664 mode="r")
3665
3666 @unittest.skipUnless(threading, 'Threading required for this test.')
3667 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3668 """Check that a buffered write, when it gets interrupted (either
3669 returning a partial result or EINTR), properly invokes the signal
3670 handler and retries if the latter returned successfully."""
3671 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003672
Antoine Pitrou707ce822011-02-25 21:24:11 +00003673 # A quantity that exceeds the buffer size of an anonymous pipe's
3674 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003675 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003676 r, w = os.pipe()
3677 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003678
Antoine Pitrou707ce822011-02-25 21:24:11 +00003679 # We need a separate thread to read from the pipe and allow the
3680 # write() to finish. This thread is started after the SIGALRM is
3681 # received (forcing a first EINTR in write()).
3682 read_results = []
3683 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003684 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003685 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003686 try:
3687 while not write_finished:
3688 while r in select.select([r], [], [], 1.0)[0]:
3689 s = os.read(r, 1024)
3690 read_results.append(s)
3691 except BaseException as exc:
3692 nonlocal error
3693 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003694 t = threading.Thread(target=_read)
3695 t.daemon = True
3696 def alarm1(sig, frame):
3697 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003698 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003699 def alarm2(sig, frame):
3700 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003701
3702 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003703 signal.signal(signal.SIGALRM, alarm1)
3704 try:
3705 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003706 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003707 # Expected behaviour:
3708 # - first raw write() is partial (because of the limited pipe buffer
3709 # and the first alarm)
3710 # - second raw write() returns EINTR (because of the second alarm)
3711 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003712 written = wio.write(large_data)
3713 self.assertEqual(N, written)
3714
Antoine Pitrou707ce822011-02-25 21:24:11 +00003715 wio.flush()
3716 write_finished = True
3717 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003718
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003719 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003720 self.assertEqual(N, sum(len(x) for x in read_results))
3721 finally:
3722 write_finished = True
3723 os.close(w)
3724 os.close(r)
3725 # This is deliberate. If we didn't close the file descriptor
3726 # before closing wio, wio would try to flush its internal
3727 # buffer, and could block (in case of failure).
3728 try:
3729 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003730 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003731 if e.errno != errno.EBADF:
3732 raise
3733
Antoine Pitrou20db5112011-08-19 20:32:34 +02003734 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003735 self.check_interrupted_write_retry(b"x", mode="wb")
3736
Antoine Pitrou20db5112011-08-19 20:32:34 +02003737 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003738 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3739
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003740
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003741class CSignalsTest(SignalsTest):
3742 io = io
3743
3744class PySignalsTest(SignalsTest):
3745 io = pyio
3746
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003747 # Handling reentrancy issues would slow down _pyio even more, so the
3748 # tests are disabled.
3749 test_reentrant_write_buffered = None
3750 test_reentrant_write_text = None
3751
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003752
Ezio Melottidaa42c72013-03-23 16:30:16 +02003753def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003754 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003755 CBufferedReaderTest, PyBufferedReaderTest,
3756 CBufferedWriterTest, PyBufferedWriterTest,
3757 CBufferedRWPairTest, PyBufferedRWPairTest,
3758 CBufferedRandomTest, PyBufferedRandomTest,
3759 StatefulIncrementalDecoderTest,
3760 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3761 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003762 CMiscIOTest, PyMiscIOTest,
3763 CSignalsTest, PySignalsTest,
3764 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003765
3766 # Put the namespaces of the IO module we are testing and some useful mock
3767 # classes in the __dict__ of each test.
3768 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003769 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003770 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3771 c_io_ns = {name : getattr(io, name) for name in all_members}
3772 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3773 globs = globals()
3774 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3775 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3776 # Avoid turning open into a bound method.
3777 py_io_ns["open"] = pyio.OpenWrapper
3778 for test in tests:
3779 if test.__name__.startswith("C"):
3780 for name, obj in c_io_ns.items():
3781 setattr(test, name, obj)
3782 elif test.__name__.startswith("Py"):
3783 for name, obj in py_io_ns.items():
3784 setattr(test, name, obj)
3785
Ezio Melottidaa42c72013-03-23 16:30:16 +02003786 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3787 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003788
3789if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003790 unittest.main()