blob: 86440c5b39b9a9d974d8d69699b1d6c3c0263824 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Berker Peksagce643912015-05-06 06:33:17 +030038from test.support.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300452 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300572 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200592 def check_flush_error_on_close(self, *args, **kwargs):
593 # Test that the file is closed despite failed flush
594 # and that flush() is called before file closed.
595 f = self.open(*args, **kwargs)
596 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200598 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200603 self.assertTrue(closed) # flush() called
604 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200605 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200606
607 def test_flush_error_on_close(self):
608 # raw file
609 # Issue #5700: io.FileIO calls flush() after file closed
610 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
611 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
612 self.check_flush_error_on_close(fd, 'wb', buffering=0)
613 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
614 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
615 os.close(fd)
616 # buffered io
617 self.check_flush_error_on_close(support.TESTFN, 'wb')
618 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
619 self.check_flush_error_on_close(fd, 'wb')
620 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
621 self.check_flush_error_on_close(fd, 'wb', closefd=False)
622 os.close(fd)
623 # text io
624 self.check_flush_error_on_close(support.TESTFN, 'w')
625 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
626 self.check_flush_error_on_close(fd, 'w')
627 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
628 self.check_flush_error_on_close(fd, 'w', closefd=False)
629 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000630
631 def test_multi_close(self):
632 f = self.open(support.TESTFN, "wb", buffering=0)
633 f.close()
634 f.close()
635 f.close()
636 self.assertRaises(ValueError, f.flush)
637
Antoine Pitrou328ec742010-09-14 18:37:24 +0000638 def test_RawIOBase_read(self):
639 # Exercise the default RawIOBase.read() implementation (which calls
640 # readinto() internally).
641 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
642 self.assertEqual(rawio.read(2), b"ab")
643 self.assertEqual(rawio.read(2), b"c")
644 self.assertEqual(rawio.read(2), b"d")
645 self.assertEqual(rawio.read(2), None)
646 self.assertEqual(rawio.read(2), b"ef")
647 self.assertEqual(rawio.read(2), b"g")
648 self.assertEqual(rawio.read(2), None)
649 self.assertEqual(rawio.read(2), b"")
650
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400651 def test_types_have_dict(self):
652 test = (
653 self.IOBase(),
654 self.RawIOBase(),
655 self.TextIOBase(),
656 self.StringIO(),
657 self.BytesIO()
658 )
659 for obj in test:
660 self.assertTrue(hasattr(obj, "__dict__"))
661
Ross Lagerwall59142db2011-10-31 20:34:46 +0200662 def test_opener(self):
663 with self.open(support.TESTFN, "w") as f:
664 f.write("egg\n")
665 fd = os.open(support.TESTFN, os.O_RDONLY)
666 def opener(path, flags):
667 return fd
668 with self.open("non-existent", "r", opener=opener) as f:
669 self.assertEqual(f.read(), "egg\n")
670
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200671 def test_fileio_closefd(self):
672 # Issue #4841
673 with self.open(__file__, 'rb') as f1, \
674 self.open(__file__, 'rb') as f2:
675 fileio = self.FileIO(f1.fileno(), closefd=False)
676 # .__init__() must not close f1
677 fileio.__init__(f2.fileno(), closefd=False)
678 f1.readline()
679 # .close() must not close f2
680 fileio.close()
681 f2.readline()
682
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300683 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200684 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300685 with self.assertRaises(ValueError):
686 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300687
688 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200689 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300690 with self.assertRaises(ValueError):
691 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300692
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200695
696 def test_IOBase_finalize(self):
697 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
698 # class which inherits IOBase and an object of this class are caught
699 # in a reference cycle and close() is already in the method cache.
700 class MyIO(self.IOBase):
701 def close(self):
702 pass
703
704 # create an instance to populate the method cache
705 MyIO()
706 obj = MyIO()
707 obj.obj = obj
708 wr = weakref.ref(obj)
709 del MyIO
710 del obj
711 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300712 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714class PyIOTest(IOTest):
715 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000716
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Gregory P. Smith1bef9072015-04-14 13:24:34 -0700718@support.cpython_only
719class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -0700720
Gregory P. Smith054b0652015-04-14 12:58:05 -0700721 def test_RawIOBase_io_in_pyio_match(self):
722 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +0200723 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
724 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -0700725 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
726
727 def test_RawIOBase_pyio_in_io_match(self):
728 """Test that c RawIOBase class has all pyio RawIOBase methods"""
729 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
730 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
731
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733class CommonBufferedTests:
734 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
735
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000736 def test_detach(self):
737 raw = self.MockRawIO()
738 buf = self.tp(raw)
739 self.assertIs(buf.detach(), raw)
740 self.assertRaises(ValueError, buf.detach)
741
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600742 repr(buf) # Should still work
743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_fileno(self):
745 rawio = self.MockRawIO()
746 bufio = self.tp(rawio)
747
Ezio Melottib3aedd42010-11-20 19:04:17 +0000748 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749
Zachary Ware9fe6d862013-12-08 00:20:35 -0600750 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 def test_no_fileno(self):
752 # XXX will we always have fileno() function? If so, kill
753 # this test. Else, write it.
754 pass
755
756 def test_invalid_args(self):
757 rawio = self.MockRawIO()
758 bufio = self.tp(rawio)
759 # Invalid whence
760 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200761 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762
763 def test_override_destructor(self):
764 tp = self.tp
765 record = []
766 class MyBufferedIO(tp):
767 def __del__(self):
768 record.append(1)
769 try:
770 f = super().__del__
771 except AttributeError:
772 pass
773 else:
774 f()
775 def close(self):
776 record.append(2)
777 super().close()
778 def flush(self):
779 record.append(3)
780 super().flush()
781 rawio = self.MockRawIO()
782 bufio = MyBufferedIO(rawio)
783 writable = bufio.writable()
784 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000785 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 if writable:
787 self.assertEqual(record, [1, 2, 3])
788 else:
789 self.assertEqual(record, [1, 2])
790
791 def test_context_manager(self):
792 # Test usability as a context manager
793 rawio = self.MockRawIO()
794 bufio = self.tp(rawio)
795 def _with():
796 with bufio:
797 pass
798 _with()
799 # bufio should now be closed, and using it a second time should raise
800 # a ValueError.
801 self.assertRaises(ValueError, _with)
802
803 def test_error_through_destructor(self):
804 # Test that the exception state is not modified by a destructor,
805 # even if close() fails.
806 rawio = self.CloseFailureIO()
807 def f():
808 self.tp(rawio).xyzzy
809 with support.captured_output("stderr") as s:
810 self.assertRaises(AttributeError, f)
811 s = s.getvalue().strip()
812 if s:
813 # The destructor *may* have printed an unraisable error, check it
814 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200815 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000816 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000817
Antoine Pitrou716c4442009-05-23 19:04:03 +0000818 def test_repr(self):
819 raw = self.MockRawIO()
820 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300821 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000822 self.assertEqual(repr(b), "<%s>" % clsname)
823 raw.name = "dummy"
824 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
825 raw.name = b"dummy"
826 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
827
Antoine Pitrou6be88762010-05-03 16:48:20 +0000828 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200829 # Test that buffered file is closed despite failed flush
830 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000831 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200832 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000833 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200834 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200835 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000836 raw.flush = bad_flush
837 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200838 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600839 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200840 self.assertTrue(raw.closed)
841 self.assertTrue(closed) # flush() called
842 self.assertFalse(closed[0]) # flush() called before file closed
843 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200844 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600845
846 def test_close_error_on_close(self):
847 raw = self.MockRawIO()
848 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200849 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600850 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200851 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600852 raw.close = bad_close
853 b = self.tp(raw)
854 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200855 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600856 b.close()
857 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300858 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600859 self.assertEqual(err.exception.__context__.args, ('flush',))
860 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000861
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300862 def test_nonnormalized_close_error_on_close(self):
863 # Issue #21677
864 raw = self.MockRawIO()
865 def bad_flush():
866 raise non_existing_flush
867 def bad_close():
868 raise non_existing_close
869 raw.close = bad_close
870 b = self.tp(raw)
871 b.flush = bad_flush
872 with self.assertRaises(NameError) as err: # exception not swallowed
873 b.close()
874 self.assertIn('non_existing_close', str(err.exception))
875 self.assertIsInstance(err.exception.__context__, NameError)
876 self.assertIn('non_existing_flush', str(err.exception.__context__))
877 self.assertFalse(b.closed)
878
Antoine Pitrou6be88762010-05-03 16:48:20 +0000879 def test_multi_close(self):
880 raw = self.MockRawIO()
881 b = self.tp(raw)
882 b.close()
883 b.close()
884 b.close()
885 self.assertRaises(ValueError, b.flush)
886
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000887 def test_unseekable(self):
888 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
889 self.assertRaises(self.UnsupportedOperation, bufio.tell)
890 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
891
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000892 def test_readonly_attributes(self):
893 raw = self.MockRawIO()
894 buf = self.tp(raw)
895 x = self.MockRawIO()
896 with self.assertRaises(AttributeError):
897 buf.raw = x
898
Guido van Rossum78892e42007-04-06 17:31:18 +0000899
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200900class SizeofTest:
901
902 @support.cpython_only
903 def test_sizeof(self):
904 bufsize1 = 4096
905 bufsize2 = 8192
906 rawio = self.MockRawIO()
907 bufio = self.tp(rawio, buffer_size=bufsize1)
908 size = sys.getsizeof(bufio) - bufsize1
909 rawio = self.MockRawIO()
910 bufio = self.tp(rawio, buffer_size=bufsize2)
911 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
912
Jesus Ceadc469452012-10-04 12:37:56 +0200913 @support.cpython_only
914 def test_buffer_freeing(self) :
915 bufsize = 4096
916 rawio = self.MockRawIO()
917 bufio = self.tp(rawio, buffer_size=bufsize)
918 size = sys.getsizeof(bufio) - bufsize
919 bufio.close()
920 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
923 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 def test_constructor(self):
926 rawio = self.MockRawIO([b"abc"])
927 bufio = self.tp(rawio)
928 bufio.__init__(rawio)
929 bufio.__init__(rawio, buffer_size=1024)
930 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
933 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
934 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
935 rawio = self.MockRawIO([b"abc"])
936 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000938
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200939 def test_uninitialized(self):
940 bufio = self.tp.__new__(self.tp)
941 del bufio
942 bufio = self.tp.__new__(self.tp)
943 self.assertRaisesRegex((ValueError, AttributeError),
944 'uninitialized|has no attribute',
945 bufio.read, 0)
946 bufio.__init__(self.MockRawIO())
947 self.assertEqual(bufio.read(0), b'')
948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000950 for arg in (None, 7):
951 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 # Invalid args
955 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_read1(self):
958 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
959 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000960 self.assertEqual(b"a", bufio.read(1))
961 self.assertEqual(b"b", bufio.read1(1))
962 self.assertEqual(rawio._reads, 1)
963 self.assertEqual(b"c", bufio.read1(100))
964 self.assertEqual(rawio._reads, 1)
965 self.assertEqual(b"d", bufio.read1(100))
966 self.assertEqual(rawio._reads, 2)
967 self.assertEqual(b"efg", bufio.read1(100))
968 self.assertEqual(rawio._reads, 3)
969 self.assertEqual(b"", bufio.read1(100))
970 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971 # Invalid args
972 self.assertRaises(ValueError, bufio.read1, -1)
973
974 def test_readinto(self):
975 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
976 bufio = self.tp(rawio)
977 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000978 self.assertEqual(bufio.readinto(b), 2)
979 self.assertEqual(b, b"ab")
980 self.assertEqual(bufio.readinto(b), 2)
981 self.assertEqual(b, b"cd")
982 self.assertEqual(bufio.readinto(b), 2)
983 self.assertEqual(b, b"ef")
984 self.assertEqual(bufio.readinto(b), 1)
985 self.assertEqual(b, b"gf")
986 self.assertEqual(bufio.readinto(b), 0)
987 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200988 rawio = self.MockRawIO((b"abc", None))
989 bufio = self.tp(rawio)
990 self.assertEqual(bufio.readinto(b), 2)
991 self.assertEqual(b, b"ab")
992 self.assertEqual(bufio.readinto(b), 1)
993 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000994
Benjamin Petersona96fea02014-06-22 14:17:44 -0700995 def test_readinto1(self):
996 buffer_size = 10
997 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
998 bufio = self.tp(rawio, buffer_size=buffer_size)
999 b = bytearray(2)
1000 self.assertEqual(bufio.peek(3), b'abc')
1001 self.assertEqual(rawio._reads, 1)
1002 self.assertEqual(bufio.readinto1(b), 2)
1003 self.assertEqual(b, b"ab")
1004 self.assertEqual(rawio._reads, 1)
1005 self.assertEqual(bufio.readinto1(b), 1)
1006 self.assertEqual(b[:1], b"c")
1007 self.assertEqual(rawio._reads, 1)
1008 self.assertEqual(bufio.readinto1(b), 2)
1009 self.assertEqual(b, b"de")
1010 self.assertEqual(rawio._reads, 2)
1011 b = bytearray(2*buffer_size)
1012 self.assertEqual(bufio.peek(3), b'fgh')
1013 self.assertEqual(rawio._reads, 3)
1014 self.assertEqual(bufio.readinto1(b), 6)
1015 self.assertEqual(b[:6], b"fghjkl")
1016 self.assertEqual(rawio._reads, 4)
1017
1018 def test_readinto_array(self):
1019 buffer_size = 60
1020 data = b"a" * 26
1021 rawio = self.MockRawIO((data,))
1022 bufio = self.tp(rawio, buffer_size=buffer_size)
1023
1024 # Create an array with element size > 1 byte
1025 b = array.array('i', b'x' * 32)
1026 assert len(b) != 16
1027
1028 # Read into it. We should get as many *bytes* as we can fit into b
1029 # (which is more than the number of elements)
1030 n = bufio.readinto(b)
1031 self.assertGreater(n, len(b))
1032
1033 # Check that old contents of b are preserved
1034 bm = memoryview(b).cast('B')
1035 self.assertLess(n, len(bm))
1036 self.assertEqual(bm[:n], data[:n])
1037 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1038
1039 def test_readinto1_array(self):
1040 buffer_size = 60
1041 data = b"a" * 26
1042 rawio = self.MockRawIO((data,))
1043 bufio = self.tp(rawio, buffer_size=buffer_size)
1044
1045 # Create an array with element size > 1 byte
1046 b = array.array('i', b'x' * 32)
1047 assert len(b) != 16
1048
1049 # Read into it. We should get as many *bytes* as we can fit into b
1050 # (which is more than the number of elements)
1051 n = bufio.readinto1(b)
1052 self.assertGreater(n, len(b))
1053
1054 # Check that old contents of b are preserved
1055 bm = memoryview(b).cast('B')
1056 self.assertLess(n, len(bm))
1057 self.assertEqual(bm[:n], data[:n])
1058 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1059
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001060 def test_readlines(self):
1061 def bufio():
1062 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1063 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1065 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1066 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001067
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001069 data = b"abcdefghi"
1070 dlen = len(data)
1071
1072 tests = [
1073 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1074 [ 100, [ 3, 3, 3], [ dlen ] ],
1075 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1076 ]
1077
1078 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 rawio = self.MockFileIO(data)
1080 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001081 pos = 0
1082 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001083 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001084 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001086 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001089 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1091 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001092 self.assertEqual(b"abcd", bufio.read(6))
1093 self.assertEqual(b"e", bufio.read(1))
1094 self.assertEqual(b"fg", bufio.read())
1095 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001096 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001097 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001098
Victor Stinnera80987f2011-05-25 22:47:16 +02001099 rawio = self.MockRawIO((b"a", None, None))
1100 self.assertEqual(b"a", rawio.readall())
1101 self.assertIsNone(rawio.readall())
1102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 def test_read_past_eof(self):
1104 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1105 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001106
Ezio Melottib3aedd42010-11-20 19:04:17 +00001107 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001108
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109 def test_read_all(self):
1110 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1111 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Ezio Melottib3aedd42010-11-20 19:04:17 +00001113 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001114
Victor Stinner45df8202010-04-28 22:31:17 +00001115 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001116 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001118 try:
1119 # Write out many bytes with exactly the same number of 0's,
1120 # 1's... 255's. This will help us check that concurrent reading
1121 # doesn't duplicate or forget contents.
1122 N = 1000
1123 l = list(range(256)) * N
1124 random.shuffle(l)
1125 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001126 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001127 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001128 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001129 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001130 errors = []
1131 results = []
1132 def f():
1133 try:
1134 # Intra-buffer read then buffer-flushing read
1135 for n in cycle([1, 19]):
1136 s = bufio.read(n)
1137 if not s:
1138 break
1139 # list.append() is atomic
1140 results.append(s)
1141 except Exception as e:
1142 errors.append(e)
1143 raise
1144 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001145 with support.start_threads(threads):
1146 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001147 self.assertFalse(errors,
1148 "the following exceptions were caught: %r" % errors)
1149 s = b''.join(results)
1150 for i in range(256):
1151 c = bytes(bytearray([i]))
1152 self.assertEqual(s.count(c), N)
1153 finally:
1154 support.unlink(support.TESTFN)
1155
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001156 def test_unseekable(self):
1157 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1158 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1159 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1160 bufio.read(1)
1161 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1162 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_misbehaved_io(self):
1165 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1166 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001167 self.assertRaises(OSError, bufio.seek, 0)
1168 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001170 def test_no_extraneous_read(self):
1171 # Issue #9550; when the raw IO object has satisfied the read request,
1172 # we should not issue any additional reads, otherwise it may block
1173 # (e.g. socket).
1174 bufsize = 16
1175 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1176 rawio = self.MockRawIO([b"x" * n])
1177 bufio = self.tp(rawio, bufsize)
1178 self.assertEqual(bufio.read(n), b"x" * n)
1179 # Simple case: one raw read is enough to satisfy the request.
1180 self.assertEqual(rawio._extraneous_reads, 0,
1181 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1182 # A more complex case where two raw reads are needed to satisfy
1183 # the request.
1184 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1185 bufio = self.tp(rawio, bufsize)
1186 self.assertEqual(bufio.read(n), b"x" * n)
1187 self.assertEqual(rawio._extraneous_reads, 0,
1188 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1189
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001190 def test_read_on_closed(self):
1191 # Issue #23796
1192 b = io.BufferedReader(io.BytesIO(b"12"))
1193 b.read(1)
1194 b.close()
1195 self.assertRaises(ValueError, b.peek)
1196 self.assertRaises(ValueError, b.read1, 1)
1197
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001198
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001199class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 tp = io.BufferedReader
1201
1202 def test_constructor(self):
1203 BufferedReaderTest.test_constructor(self)
1204 # The allocation can succeed on 32-bit builds, e.g. with more
1205 # than 2GB RAM and a 64-bit kernel.
1206 if sys.maxsize > 0x7FFFFFFF:
1207 rawio = self.MockRawIO()
1208 bufio = self.tp(rawio)
1209 self.assertRaises((OverflowError, MemoryError, ValueError),
1210 bufio.__init__, rawio, sys.maxsize)
1211
1212 def test_initialization(self):
1213 rawio = self.MockRawIO([b"abc"])
1214 bufio = self.tp(rawio)
1215 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1216 self.assertRaises(ValueError, bufio.read)
1217 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1218 self.assertRaises(ValueError, bufio.read)
1219 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1220 self.assertRaises(ValueError, bufio.read)
1221
1222 def test_misbehaved_io_read(self):
1223 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1224 bufio = self.tp(rawio)
1225 # _pyio.BufferedReader seems to implement reading different, so that
1226 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001227 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228
1229 def test_garbage_collection(self):
1230 # C BufferedReader objects are collected.
1231 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001232 with support.check_warnings(('', ResourceWarning)):
1233 rawio = self.FileIO(support.TESTFN, "w+b")
1234 f = self.tp(rawio)
1235 f.f = f
1236 wr = weakref.ref(f)
1237 del f
1238 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001239 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001240
R David Murray67bfe802013-02-23 21:51:05 -05001241 def test_args_error(self):
1242 # Issue #17275
1243 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1244 self.tp(io.BytesIO(), 1024, 1024, 1024)
1245
1246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247class PyBufferedReaderTest(BufferedReaderTest):
1248 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001249
Guido van Rossuma9e20242007-03-08 00:43:48 +00001250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1252 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 def test_constructor(self):
1255 rawio = self.MockRawIO()
1256 bufio = self.tp(rawio)
1257 bufio.__init__(rawio)
1258 bufio.__init__(rawio, buffer_size=1024)
1259 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001260 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261 bufio.flush()
1262 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1263 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1264 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1265 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001267 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001268 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001270 def test_uninitialized(self):
1271 bufio = self.tp.__new__(self.tp)
1272 del bufio
1273 bufio = self.tp.__new__(self.tp)
1274 self.assertRaisesRegex((ValueError, AttributeError),
1275 'uninitialized|has no attribute',
1276 bufio.write, b'')
1277 bufio.__init__(self.MockRawIO())
1278 self.assertEqual(bufio.write(b''), 0)
1279
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001280 def test_detach_flush(self):
1281 raw = self.MockRawIO()
1282 buf = self.tp(raw)
1283 buf.write(b"howdy!")
1284 self.assertFalse(raw._write_stack)
1285 buf.detach()
1286 self.assertEqual(raw._write_stack, [b"howdy!"])
1287
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001289 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 writer = self.MockRawIO()
1291 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001292 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001293 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001295 def test_write_overflow(self):
1296 writer = self.MockRawIO()
1297 bufio = self.tp(writer, 8)
1298 contents = b"abcdefghijklmnop"
1299 for n in range(0, len(contents), 3):
1300 bufio.write(contents[n:n+3])
1301 flushed = b"".join(writer._write_stack)
1302 # At least (total - 8) bytes were implicitly flushed, perhaps more
1303 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001304 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001305
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306 def check_writes(self, intermediate_func):
1307 # Lots of writes, test the flushed output is as expected.
1308 contents = bytes(range(256)) * 1000
1309 n = 0
1310 writer = self.MockRawIO()
1311 bufio = self.tp(writer, 13)
1312 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1313 def gen_sizes():
1314 for size in count(1):
1315 for i in range(15):
1316 yield size
1317 sizes = gen_sizes()
1318 while n < len(contents):
1319 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001320 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001321 intermediate_func(bufio)
1322 n += size
1323 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001324 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001325
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001326 def test_writes(self):
1327 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001328
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 def test_writes_and_flushes(self):
1330 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001331
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001332 def test_writes_and_seeks(self):
1333 def _seekabs(bufio):
1334 pos = bufio.tell()
1335 bufio.seek(pos + 1, 0)
1336 bufio.seek(pos - 1, 0)
1337 bufio.seek(pos, 0)
1338 self.check_writes(_seekabs)
1339 def _seekrel(bufio):
1340 pos = bufio.seek(0, 1)
1341 bufio.seek(+1, 1)
1342 bufio.seek(-1, 1)
1343 bufio.seek(pos, 0)
1344 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346 def test_writes_and_truncates(self):
1347 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001348
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001349 def test_write_non_blocking(self):
1350 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001351 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001352
Ezio Melottib3aedd42010-11-20 19:04:17 +00001353 self.assertEqual(bufio.write(b"abcd"), 4)
1354 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355 # 1 byte will be written, the rest will be buffered
1356 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001357 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001359 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1360 raw.block_on(b"0")
1361 try:
1362 bufio.write(b"opqrwxyz0123456789")
1363 except self.BlockingIOError as e:
1364 written = e.characters_written
1365 else:
1366 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(written, 16)
1368 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001370
Ezio Melottib3aedd42010-11-20 19:04:17 +00001371 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 s = raw.pop_written()
1373 # Previously buffered bytes were flushed
1374 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001375
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001376 def test_write_and_rewind(self):
1377 raw = io.BytesIO()
1378 bufio = self.tp(raw, 4)
1379 self.assertEqual(bufio.write(b"abcdef"), 6)
1380 self.assertEqual(bufio.tell(), 6)
1381 bufio.seek(0, 0)
1382 self.assertEqual(bufio.write(b"XY"), 2)
1383 bufio.seek(6, 0)
1384 self.assertEqual(raw.getvalue(), b"XYcdef")
1385 self.assertEqual(bufio.write(b"123456"), 6)
1386 bufio.flush()
1387 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 def test_flush(self):
1390 writer = self.MockRawIO()
1391 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001392 bufio.write(b"abc")
1393 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001394 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001395
Antoine Pitrou131a4892012-10-16 22:57:11 +02001396 def test_writelines(self):
1397 l = [b'ab', b'cd', b'ef']
1398 writer = self.MockRawIO()
1399 bufio = self.tp(writer, 8)
1400 bufio.writelines(l)
1401 bufio.flush()
1402 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1403
1404 def test_writelines_userlist(self):
1405 l = UserList([b'ab', b'cd', b'ef'])
1406 writer = self.MockRawIO()
1407 bufio = self.tp(writer, 8)
1408 bufio.writelines(l)
1409 bufio.flush()
1410 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1411
1412 def test_writelines_error(self):
1413 writer = self.MockRawIO()
1414 bufio = self.tp(writer, 8)
1415 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1416 self.assertRaises(TypeError, bufio.writelines, None)
1417 self.assertRaises(TypeError, bufio.writelines, 'abc')
1418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 def test_destructor(self):
1420 writer = self.MockRawIO()
1421 bufio = self.tp(writer, 8)
1422 bufio.write(b"abc")
1423 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001424 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001425 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426
1427 def test_truncate(self):
1428 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001429 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 bufio = self.tp(raw, 8)
1431 bufio.write(b"abcdef")
1432 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001433 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001434 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 self.assertEqual(f.read(), b"abc")
1436
Victor Stinner45df8202010-04-28 22:31:17 +00001437 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001438 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001440 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001441 # Write out many bytes from many threads and test they were
1442 # all flushed.
1443 N = 1000
1444 contents = bytes(range(256)) * N
1445 sizes = cycle([1, 19])
1446 n = 0
1447 queue = deque()
1448 while n < len(contents):
1449 size = next(sizes)
1450 queue.append(contents[n:n+size])
1451 n += size
1452 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001453 # We use a real file object because it allows us to
1454 # exercise situations where the GIL is released before
1455 # writing the buffer to the raw streams. This is in addition
1456 # to concurrency issues due to switching threads in the middle
1457 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001458 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001460 errors = []
1461 def f():
1462 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 while True:
1464 try:
1465 s = queue.popleft()
1466 except IndexError:
1467 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001468 bufio.write(s)
1469 except Exception as e:
1470 errors.append(e)
1471 raise
1472 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001473 with support.start_threads(threads):
1474 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001475 self.assertFalse(errors,
1476 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001477 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001478 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 s = f.read()
1480 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001482 finally:
1483 support.unlink(support.TESTFN)
1484
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001485 def test_misbehaved_io(self):
1486 rawio = self.MisbehavedRawIO()
1487 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001488 self.assertRaises(OSError, bufio.seek, 0)
1489 self.assertRaises(OSError, bufio.tell)
1490 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001491
Florent Xicluna109d5732012-07-07 17:03:22 +02001492 def test_max_buffer_size_removal(self):
1493 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001494 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001495
Benjamin Peterson68623612012-12-20 11:53:11 -06001496 def test_write_error_on_close(self):
1497 raw = self.MockRawIO()
1498 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001499 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001500 raw.write = bad_write
1501 b = self.tp(raw)
1502 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001503 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001504 self.assertTrue(b.closed)
1505
Benjamin Peterson59406a92009-03-26 17:10:29 +00001506
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001507class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001508 tp = io.BufferedWriter
1509
1510 def test_constructor(self):
1511 BufferedWriterTest.test_constructor(self)
1512 # The allocation can succeed on 32-bit builds, e.g. with more
1513 # than 2GB RAM and a 64-bit kernel.
1514 if sys.maxsize > 0x7FFFFFFF:
1515 rawio = self.MockRawIO()
1516 bufio = self.tp(rawio)
1517 self.assertRaises((OverflowError, MemoryError, ValueError),
1518 bufio.__init__, rawio, sys.maxsize)
1519
1520 def test_initialization(self):
1521 rawio = self.MockRawIO()
1522 bufio = self.tp(rawio)
1523 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1524 self.assertRaises(ValueError, bufio.write, b"def")
1525 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1526 self.assertRaises(ValueError, bufio.write, b"def")
1527 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1528 self.assertRaises(ValueError, bufio.write, b"def")
1529
1530 def test_garbage_collection(self):
1531 # C BufferedWriter objects are collected, and collecting them flushes
1532 # all data to disk.
1533 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001534 with support.check_warnings(('', ResourceWarning)):
1535 rawio = self.FileIO(support.TESTFN, "w+b")
1536 f = self.tp(rawio)
1537 f.write(b"123xxx")
1538 f.x = f
1539 wr = weakref.ref(f)
1540 del f
1541 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001542 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001543 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 self.assertEqual(f.read(), b"123xxx")
1545
R David Murray67bfe802013-02-23 21:51:05 -05001546 def test_args_error(self):
1547 # Issue #17275
1548 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1549 self.tp(io.BytesIO(), 1024, 1024, 1024)
1550
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551
1552class PyBufferedWriterTest(BufferedWriterTest):
1553 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001554
Guido van Rossum01a27522007-03-07 01:00:12 +00001555class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001556
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001557 def test_constructor(self):
1558 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001559 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001560
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001561 def test_uninitialized(self):
1562 pair = self.tp.__new__(self.tp)
1563 del pair
1564 pair = self.tp.__new__(self.tp)
1565 self.assertRaisesRegex((ValueError, AttributeError),
1566 'uninitialized|has no attribute',
1567 pair.read, 0)
1568 self.assertRaisesRegex((ValueError, AttributeError),
1569 'uninitialized|has no attribute',
1570 pair.write, b'')
1571 pair.__init__(self.MockRawIO(), self.MockRawIO())
1572 self.assertEqual(pair.read(0), b'')
1573 self.assertEqual(pair.write(b''), 0)
1574
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001575 def test_detach(self):
1576 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1577 self.assertRaises(self.UnsupportedOperation, pair.detach)
1578
Florent Xicluna109d5732012-07-07 17:03:22 +02001579 def test_constructor_max_buffer_size_removal(self):
1580 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001581 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001582
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001583 def test_constructor_with_not_readable(self):
1584 class NotReadable(MockRawIO):
1585 def readable(self):
1586 return False
1587
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001588 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001589
1590 def test_constructor_with_not_writeable(self):
1591 class NotWriteable(MockRawIO):
1592 def writable(self):
1593 return False
1594
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001595 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001596
1597 def test_read(self):
1598 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1599
1600 self.assertEqual(pair.read(3), b"abc")
1601 self.assertEqual(pair.read(1), b"d")
1602 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001603 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1604 self.assertEqual(pair.read(None), b"abc")
1605
1606 def test_readlines(self):
1607 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1608 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1609 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1610 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001611
1612 def test_read1(self):
1613 # .read1() is delegated to the underlying reader object, so this test
1614 # can be shallow.
1615 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1616
1617 self.assertEqual(pair.read1(3), b"abc")
1618
1619 def test_readinto(self):
1620 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1621
1622 data = bytearray(5)
1623 self.assertEqual(pair.readinto(data), 5)
1624 self.assertEqual(data, b"abcde")
1625
1626 def test_write(self):
1627 w = self.MockRawIO()
1628 pair = self.tp(self.MockRawIO(), w)
1629
1630 pair.write(b"abc")
1631 pair.flush()
1632 pair.write(b"def")
1633 pair.flush()
1634 self.assertEqual(w._write_stack, [b"abc", b"def"])
1635
1636 def test_peek(self):
1637 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1638
1639 self.assertTrue(pair.peek(3).startswith(b"abc"))
1640 self.assertEqual(pair.read(3), b"abc")
1641
1642 def test_readable(self):
1643 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1644 self.assertTrue(pair.readable())
1645
1646 def test_writeable(self):
1647 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1648 self.assertTrue(pair.writable())
1649
1650 def test_seekable(self):
1651 # BufferedRWPairs are never seekable, even if their readers and writers
1652 # are.
1653 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1654 self.assertFalse(pair.seekable())
1655
1656 # .flush() is delegated to the underlying writer object and has been
1657 # tested in the test_write method.
1658
1659 def test_close_and_closed(self):
1660 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1661 self.assertFalse(pair.closed)
1662 pair.close()
1663 self.assertTrue(pair.closed)
1664
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001665 def test_reader_close_error_on_close(self):
1666 def reader_close():
1667 reader_non_existing
1668 reader = self.MockRawIO()
1669 reader.close = reader_close
1670 writer = self.MockRawIO()
1671 pair = self.tp(reader, writer)
1672 with self.assertRaises(NameError) as err:
1673 pair.close()
1674 self.assertIn('reader_non_existing', str(err.exception))
1675 self.assertTrue(pair.closed)
1676 self.assertFalse(reader.closed)
1677 self.assertTrue(writer.closed)
1678
1679 def test_writer_close_error_on_close(self):
1680 def writer_close():
1681 writer_non_existing
1682 reader = self.MockRawIO()
1683 writer = self.MockRawIO()
1684 writer.close = writer_close
1685 pair = self.tp(reader, writer)
1686 with self.assertRaises(NameError) as err:
1687 pair.close()
1688 self.assertIn('writer_non_existing', str(err.exception))
1689 self.assertFalse(pair.closed)
1690 self.assertTrue(reader.closed)
1691 self.assertFalse(writer.closed)
1692
1693 def test_reader_writer_close_error_on_close(self):
1694 def reader_close():
1695 reader_non_existing
1696 def writer_close():
1697 writer_non_existing
1698 reader = self.MockRawIO()
1699 reader.close = reader_close
1700 writer = self.MockRawIO()
1701 writer.close = writer_close
1702 pair = self.tp(reader, writer)
1703 with self.assertRaises(NameError) as err:
1704 pair.close()
1705 self.assertIn('reader_non_existing', str(err.exception))
1706 self.assertIsInstance(err.exception.__context__, NameError)
1707 self.assertIn('writer_non_existing', str(err.exception.__context__))
1708 self.assertFalse(pair.closed)
1709 self.assertFalse(reader.closed)
1710 self.assertFalse(writer.closed)
1711
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001712 def test_isatty(self):
1713 class SelectableIsAtty(MockRawIO):
1714 def __init__(self, isatty):
1715 MockRawIO.__init__(self)
1716 self._isatty = isatty
1717
1718 def isatty(self):
1719 return self._isatty
1720
1721 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1722 self.assertFalse(pair.isatty())
1723
1724 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1725 self.assertTrue(pair.isatty())
1726
1727 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1728 self.assertTrue(pair.isatty())
1729
1730 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1731 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001732
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001733 def test_weakref_clearing(self):
1734 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1735 ref = weakref.ref(brw)
1736 brw = None
1737 ref = None # Shouldn't segfault.
1738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739class CBufferedRWPairTest(BufferedRWPairTest):
1740 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001742class PyBufferedRWPairTest(BufferedRWPairTest):
1743 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745
1746class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1747 read_mode = "rb+"
1748 write_mode = "wb+"
1749
1750 def test_constructor(self):
1751 BufferedReaderTest.test_constructor(self)
1752 BufferedWriterTest.test_constructor(self)
1753
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001754 def test_uninitialized(self):
1755 BufferedReaderTest.test_uninitialized(self)
1756 BufferedWriterTest.test_uninitialized(self)
1757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001758 def test_read_and_write(self):
1759 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001760 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001761
1762 self.assertEqual(b"as", rw.read(2))
1763 rw.write(b"ddd")
1764 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001765 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001767 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 def test_seek_and_tell(self):
1770 raw = self.BytesIO(b"asdfghjkl")
1771 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001772
Ezio Melottib3aedd42010-11-20 19:04:17 +00001773 self.assertEqual(b"as", rw.read(2))
1774 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001775 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001776 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001777
Antoine Pitroue05565e2011-08-20 14:39:23 +02001778 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001779 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001780 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001781 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001782 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001783 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001784 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001785 self.assertEqual(7, rw.tell())
1786 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001787 rw.flush()
1788 self.assertEqual(b"asdf123fl", raw.getvalue())
1789
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001790 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001791
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001792 def check_flush_and_read(self, read_func):
1793 raw = self.BytesIO(b"abcdefghi")
1794 bufio = self.tp(raw)
1795
Ezio Melottib3aedd42010-11-20 19:04:17 +00001796 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001798 self.assertEqual(b"ef", read_func(bufio, 2))
1799 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001801 self.assertEqual(6, bufio.tell())
1802 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 raw.seek(0, 0)
1804 raw.write(b"XYZ")
1805 # flush() resets the read buffer
1806 bufio.flush()
1807 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001808 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001809
1810 def test_flush_and_read(self):
1811 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1812
1813 def test_flush_and_readinto(self):
1814 def _readinto(bufio, n=-1):
1815 b = bytearray(n if n >= 0 else 9999)
1816 n = bufio.readinto(b)
1817 return bytes(b[:n])
1818 self.check_flush_and_read(_readinto)
1819
1820 def test_flush_and_peek(self):
1821 def _peek(bufio, n=-1):
1822 # This relies on the fact that the buffer can contain the whole
1823 # raw stream, otherwise peek() can return less.
1824 b = bufio.peek(n)
1825 if n != -1:
1826 b = b[:n]
1827 bufio.seek(len(b), 1)
1828 return b
1829 self.check_flush_and_read(_peek)
1830
1831 def test_flush_and_write(self):
1832 raw = self.BytesIO(b"abcdefghi")
1833 bufio = self.tp(raw)
1834
1835 bufio.write(b"123")
1836 bufio.flush()
1837 bufio.write(b"45")
1838 bufio.flush()
1839 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001840 self.assertEqual(b"12345fghi", raw.getvalue())
1841 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842
1843 def test_threads(self):
1844 BufferedReaderTest.test_threads(self)
1845 BufferedWriterTest.test_threads(self)
1846
1847 def test_writes_and_peek(self):
1848 def _peek(bufio):
1849 bufio.peek(1)
1850 self.check_writes(_peek)
1851 def _peek(bufio):
1852 pos = bufio.tell()
1853 bufio.seek(-1, 1)
1854 bufio.peek(1)
1855 bufio.seek(pos, 0)
1856 self.check_writes(_peek)
1857
1858 def test_writes_and_reads(self):
1859 def _read(bufio):
1860 bufio.seek(-1, 1)
1861 bufio.read(1)
1862 self.check_writes(_read)
1863
1864 def test_writes_and_read1s(self):
1865 def _read1(bufio):
1866 bufio.seek(-1, 1)
1867 bufio.read1(1)
1868 self.check_writes(_read1)
1869
1870 def test_writes_and_readintos(self):
1871 def _read(bufio):
1872 bufio.seek(-1, 1)
1873 bufio.readinto(bytearray(1))
1874 self.check_writes(_read)
1875
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001876 def test_write_after_readahead(self):
1877 # Issue #6629: writing after the buffer was filled by readahead should
1878 # first rewind the raw stream.
1879 for overwrite_size in [1, 5]:
1880 raw = self.BytesIO(b"A" * 10)
1881 bufio = self.tp(raw, 4)
1882 # Trigger readahead
1883 self.assertEqual(bufio.read(1), b"A")
1884 self.assertEqual(bufio.tell(), 1)
1885 # Overwriting should rewind the raw stream if it needs so
1886 bufio.write(b"B" * overwrite_size)
1887 self.assertEqual(bufio.tell(), overwrite_size + 1)
1888 # If the write size was smaller than the buffer size, flush() and
1889 # check that rewind happens.
1890 bufio.flush()
1891 self.assertEqual(bufio.tell(), overwrite_size + 1)
1892 s = raw.getvalue()
1893 self.assertEqual(s,
1894 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1895
Antoine Pitrou7c404892011-05-13 00:13:33 +02001896 def test_write_rewind_write(self):
1897 # Various combinations of reading / writing / seeking backwards / writing again
1898 def mutate(bufio, pos1, pos2):
1899 assert pos2 >= pos1
1900 # Fill the buffer
1901 bufio.seek(pos1)
1902 bufio.read(pos2 - pos1)
1903 bufio.write(b'\x02')
1904 # This writes earlier than the previous write, but still inside
1905 # the buffer.
1906 bufio.seek(pos1)
1907 bufio.write(b'\x01')
1908
1909 b = b"\x80\x81\x82\x83\x84"
1910 for i in range(0, len(b)):
1911 for j in range(i, len(b)):
1912 raw = self.BytesIO(b)
1913 bufio = self.tp(raw, 100)
1914 mutate(bufio, i, j)
1915 bufio.flush()
1916 expected = bytearray(b)
1917 expected[j] = 2
1918 expected[i] = 1
1919 self.assertEqual(raw.getvalue(), expected,
1920 "failed result for i=%d, j=%d" % (i, j))
1921
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001922 def test_truncate_after_read_or_write(self):
1923 raw = self.BytesIO(b"A" * 10)
1924 bufio = self.tp(raw, 100)
1925 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1926 self.assertEqual(bufio.truncate(), 2)
1927 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1928 self.assertEqual(bufio.truncate(), 4)
1929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 def test_misbehaved_io(self):
1931 BufferedReaderTest.test_misbehaved_io(self)
1932 BufferedWriterTest.test_misbehaved_io(self)
1933
Antoine Pitroue05565e2011-08-20 14:39:23 +02001934 def test_interleaved_read_write(self):
1935 # Test for issue #12213
1936 with self.BytesIO(b'abcdefgh') as raw:
1937 with self.tp(raw, 100) as f:
1938 f.write(b"1")
1939 self.assertEqual(f.read(1), b'b')
1940 f.write(b'2')
1941 self.assertEqual(f.read1(1), b'd')
1942 f.write(b'3')
1943 buf = bytearray(1)
1944 f.readinto(buf)
1945 self.assertEqual(buf, b'f')
1946 f.write(b'4')
1947 self.assertEqual(f.peek(1), b'h')
1948 f.flush()
1949 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1950
1951 with self.BytesIO(b'abc') as raw:
1952 with self.tp(raw, 100) as f:
1953 self.assertEqual(f.read(1), b'a')
1954 f.write(b"2")
1955 self.assertEqual(f.read(1), b'c')
1956 f.flush()
1957 self.assertEqual(raw.getvalue(), b'a2c')
1958
1959 def test_interleaved_readline_write(self):
1960 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1961 with self.tp(raw) as f:
1962 f.write(b'1')
1963 self.assertEqual(f.readline(), b'b\n')
1964 f.write(b'2')
1965 self.assertEqual(f.readline(), b'def\n')
1966 f.write(b'3')
1967 self.assertEqual(f.readline(), b'\n')
1968 f.flush()
1969 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1970
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001971 # You can't construct a BufferedRandom over a non-seekable stream.
1972 test_unseekable = None
1973
R David Murray67bfe802013-02-23 21:51:05 -05001974
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001975class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 tp = io.BufferedRandom
1977
1978 def test_constructor(self):
1979 BufferedRandomTest.test_constructor(self)
1980 # The allocation can succeed on 32-bit builds, e.g. with more
1981 # than 2GB RAM and a 64-bit kernel.
1982 if sys.maxsize > 0x7FFFFFFF:
1983 rawio = self.MockRawIO()
1984 bufio = self.tp(rawio)
1985 self.assertRaises((OverflowError, MemoryError, ValueError),
1986 bufio.__init__, rawio, sys.maxsize)
1987
1988 def test_garbage_collection(self):
1989 CBufferedReaderTest.test_garbage_collection(self)
1990 CBufferedWriterTest.test_garbage_collection(self)
1991
R David Murray67bfe802013-02-23 21:51:05 -05001992 def test_args_error(self):
1993 # Issue #17275
1994 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1995 self.tp(io.BytesIO(), 1024, 1024, 1024)
1996
1997
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998class PyBufferedRandomTest(BufferedRandomTest):
1999 tp = pyio.BufferedRandom
2000
2001
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002002# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2003# properties:
2004# - A single output character can correspond to many bytes of input.
2005# - The number of input bytes to complete the character can be
2006# undetermined until the last input byte is received.
2007# - The number of input bytes can vary depending on previous input.
2008# - A single input byte can correspond to many characters of output.
2009# - The number of output characters can be undetermined until the
2010# last input byte is received.
2011# - The number of output characters can vary depending on previous input.
2012
2013class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2014 """
2015 For testing seek/tell behavior with a stateful, buffering decoder.
2016
2017 Input is a sequence of words. Words may be fixed-length (length set
2018 by input) or variable-length (period-terminated). In variable-length
2019 mode, extra periods are ignored. Possible words are:
2020 - 'i' followed by a number sets the input length, I (maximum 99).
2021 When I is set to 0, words are space-terminated.
2022 - 'o' followed by a number sets the output length, O (maximum 99).
2023 - Any other word is converted into a word followed by a period on
2024 the output. The output word consists of the input word truncated
2025 or padded out with hyphens to make its length equal to O. If O
2026 is 0, the word is output verbatim without truncating or padding.
2027 I and O are initially set to 1. When I changes, any buffered input is
2028 re-scanned according to the new I. EOF also terminates the last word.
2029 """
2030
2031 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002032 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002033 self.reset()
2034
2035 def __repr__(self):
2036 return '<SID %x>' % id(self)
2037
2038 def reset(self):
2039 self.i = 1
2040 self.o = 1
2041 self.buffer = bytearray()
2042
2043 def getstate(self):
2044 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2045 return bytes(self.buffer), i*100 + o
2046
2047 def setstate(self, state):
2048 buffer, io = state
2049 self.buffer = bytearray(buffer)
2050 i, o = divmod(io, 100)
2051 self.i, self.o = i ^ 1, o ^ 1
2052
2053 def decode(self, input, final=False):
2054 output = ''
2055 for b in input:
2056 if self.i == 0: # variable-length, terminated with period
2057 if b == ord('.'):
2058 if self.buffer:
2059 output += self.process_word()
2060 else:
2061 self.buffer.append(b)
2062 else: # fixed-length, terminate after self.i bytes
2063 self.buffer.append(b)
2064 if len(self.buffer) == self.i:
2065 output += self.process_word()
2066 if final and self.buffer: # EOF terminates the last word
2067 output += self.process_word()
2068 return output
2069
2070 def process_word(self):
2071 output = ''
2072 if self.buffer[0] == ord('i'):
2073 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2074 elif self.buffer[0] == ord('o'):
2075 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2076 else:
2077 output = self.buffer.decode('ascii')
2078 if len(output) < self.o:
2079 output += '-'*self.o # pad out with hyphens
2080 if self.o:
2081 output = output[:self.o] # truncate to output length
2082 output += '.'
2083 self.buffer = bytearray()
2084 return output
2085
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002086 codecEnabled = False
2087
2088 @classmethod
2089 def lookupTestDecoder(cls, name):
2090 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002091 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002092 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002093 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002094 incrementalencoder=None,
2095 streamreader=None, streamwriter=None,
2096 incrementaldecoder=cls)
2097
2098# Register the previous decoder for testing.
2099# Disabled by default, tests will enable it.
2100codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2101
2102
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002103class StatefulIncrementalDecoderTest(unittest.TestCase):
2104 """
2105 Make sure the StatefulIncrementalDecoder actually works.
2106 """
2107
2108 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002109 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002110 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002111 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002112 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002113 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002114 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002115 # I=0, O=6 (variable-length input, fixed-length output)
2116 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2117 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002118 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002119 # I=6, O=3 (fixed-length input > fixed-length output)
2120 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2121 # I=0, then 3; O=29, then 15 (with longer output)
2122 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2123 'a----------------------------.' +
2124 'b----------------------------.' +
2125 'cde--------------------------.' +
2126 'abcdefghijabcde.' +
2127 'a.b------------.' +
2128 '.c.------------.' +
2129 'd.e------------.' +
2130 'k--------------.' +
2131 'l--------------.' +
2132 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002133 ]
2134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002136 # Try a few one-shot test cases.
2137 for input, eof, output in self.test_cases:
2138 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002139 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002140
2141 # Also test an unfinished decode, followed by forcing EOF.
2142 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002143 self.assertEqual(d.decode(b'oiabcd'), '')
2144 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002145
2146class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002147
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002148 def setUp(self):
2149 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2150 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002151 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002152
Guido van Rossumd0712812007-04-11 16:32:43 +00002153 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002154 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 def test_constructor(self):
2157 r = self.BytesIO(b"\xc3\xa9\n\n")
2158 b = self.BufferedReader(r, 1000)
2159 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002160 t.__init__(b, encoding="latin-1", newline="\r\n")
2161 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002162 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002163 t.__init__(b, encoding="utf-8", line_buffering=True)
2164 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002165 self.assertEqual(t.line_buffering, True)
2166 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 self.assertRaises(TypeError, t.__init__, b, newline=42)
2168 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2169
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002170 def test_uninitialized(self):
2171 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2172 del t
2173 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2174 self.assertRaises(Exception, repr, t)
2175 self.assertRaisesRegex((ValueError, AttributeError),
2176 'uninitialized|has no attribute',
2177 t.read, 0)
2178 t.__init__(self.MockRawIO())
2179 self.assertEqual(t.read(0), '')
2180
Nick Coghlana9b15242014-02-04 22:11:18 +10002181 def test_non_text_encoding_codecs_are_rejected(self):
2182 # Ensure the constructor complains if passed a codec that isn't
2183 # marked as a text encoding
2184 # http://bugs.python.org/issue20404
2185 r = self.BytesIO()
2186 b = self.BufferedWriter(r)
2187 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2188 self.TextIOWrapper(b, encoding="hex")
2189
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002190 def test_detach(self):
2191 r = self.BytesIO()
2192 b = self.BufferedWriter(r)
2193 t = self.TextIOWrapper(b)
2194 self.assertIs(t.detach(), b)
2195
2196 t = self.TextIOWrapper(b, encoding="ascii")
2197 t.write("howdy")
2198 self.assertFalse(r.getvalue())
2199 t.detach()
2200 self.assertEqual(r.getvalue(), b"howdy")
2201 self.assertRaises(ValueError, t.detach)
2202
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002203 # Operations independent of the detached stream should still work
2204 repr(t)
2205 self.assertEqual(t.encoding, "ascii")
2206 self.assertEqual(t.errors, "strict")
2207 self.assertFalse(t.line_buffering)
2208
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002209 def test_repr(self):
2210 raw = self.BytesIO("hello".encode("utf-8"))
2211 b = self.BufferedReader(raw)
2212 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002213 modname = self.TextIOWrapper.__module__
2214 self.assertEqual(repr(t),
2215 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2216 raw.name = "dummy"
2217 self.assertEqual(repr(t),
2218 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002219 t.mode = "r"
2220 self.assertEqual(repr(t),
2221 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002222 raw.name = b"dummy"
2223 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002224 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002225
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002226 t.buffer.detach()
2227 repr(t) # Should not raise an exception
2228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002229 def test_line_buffering(self):
2230 r = self.BytesIO()
2231 b = self.BufferedWriter(r, 1000)
2232 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002233 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002234 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002235 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002237 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002239
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002240 def test_default_encoding(self):
2241 old_environ = dict(os.environ)
2242 try:
2243 # try to get a user preferred encoding different than the current
2244 # locale encoding to check that TextIOWrapper() uses the current
2245 # locale encoding and not the user preferred encoding
2246 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2247 if key in os.environ:
2248 del os.environ[key]
2249
2250 current_locale_encoding = locale.getpreferredencoding(False)
2251 b = self.BytesIO()
2252 t = self.TextIOWrapper(b)
2253 self.assertEqual(t.encoding, current_locale_encoding)
2254 finally:
2255 os.environ.clear()
2256 os.environ.update(old_environ)
2257
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002258 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002259 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002260 # Issue 15989
2261 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002262 b = self.BytesIO()
2263 b.fileno = lambda: _testcapi.INT_MAX + 1
2264 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2265 b.fileno = lambda: _testcapi.UINT_MAX + 1
2266 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 def test_encoding(self):
2269 # Check the encoding attribute is always set, and valid
2270 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002271 t = self.TextIOWrapper(b, encoding="utf-8")
2272 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002274 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002275 codecs.lookup(t.encoding)
2276
2277 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002278 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 b = self.BytesIO(b"abc\n\xff\n")
2280 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002281 self.assertRaises(UnicodeError, t.read)
2282 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002283 b = self.BytesIO(b"abc\n\xff\n")
2284 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002285 self.assertRaises(UnicodeError, t.read)
2286 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 b = self.BytesIO(b"abc\n\xff\n")
2288 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002289 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002290 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291 b = self.BytesIO(b"abc\n\xff\n")
2292 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002293 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002296 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 b = self.BytesIO()
2298 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002299 self.assertRaises(UnicodeError, t.write, "\xff")
2300 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 b = self.BytesIO()
2302 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002303 self.assertRaises(UnicodeError, t.write, "\xff")
2304 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 b = self.BytesIO()
2306 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002307 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002308 t.write("abc\xffdef\n")
2309 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002310 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002311 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002312 b = self.BytesIO()
2313 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002314 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002315 t.write("abc\xffdef\n")
2316 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002317 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002318
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002320 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2321
2322 tests = [
2323 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002324 [ '', input_lines ],
2325 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2326 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2327 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002328 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002329 encodings = (
2330 'utf-8', 'latin-1',
2331 'utf-16', 'utf-16-le', 'utf-16-be',
2332 'utf-32', 'utf-32-le', 'utf-32-be',
2333 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002334
Guido van Rossum8358db22007-08-18 21:39:55 +00002335 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002336 # character in TextIOWrapper._pending_line.
2337 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002338 # XXX: str.encode() should return bytes
2339 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002340 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002341 for bufsize in range(1, 10):
2342 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002343 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2344 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002345 encoding=encoding)
2346 if do_reads:
2347 got_lines = []
2348 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002349 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002350 if c2 == '':
2351 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002352 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002353 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002354 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002355 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002356
2357 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002358 self.assertEqual(got_line, exp_line)
2359 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002360
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 def test_newlines_input(self):
2362 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002363 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2364 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002365 (None, normalized.decode("ascii").splitlines(keepends=True)),
2366 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2368 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2369 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002370 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002371 buf = self.BytesIO(testdata)
2372 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002373 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002374 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002375 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002376
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 def test_newlines_output(self):
2378 testdict = {
2379 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2380 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2381 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2382 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2383 }
2384 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2385 for newline, expected in tests:
2386 buf = self.BytesIO()
2387 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2388 txt.write("AAA\nB")
2389 txt.write("BB\nCCC\n")
2390 txt.write("X\rY\r\nZ")
2391 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual(buf.closed, False)
2393 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394
2395 def test_destructor(self):
2396 l = []
2397 base = self.BytesIO
2398 class MyBytesIO(base):
2399 def close(self):
2400 l.append(self.getvalue())
2401 base.close(self)
2402 b = MyBytesIO()
2403 t = self.TextIOWrapper(b, encoding="ascii")
2404 t.write("abc")
2405 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002406 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002407 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408
2409 def test_override_destructor(self):
2410 record = []
2411 class MyTextIO(self.TextIOWrapper):
2412 def __del__(self):
2413 record.append(1)
2414 try:
2415 f = super().__del__
2416 except AttributeError:
2417 pass
2418 else:
2419 f()
2420 def close(self):
2421 record.append(2)
2422 super().close()
2423 def flush(self):
2424 record.append(3)
2425 super().flush()
2426 b = self.BytesIO()
2427 t = MyTextIO(b, encoding="ascii")
2428 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002429 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002430 self.assertEqual(record, [1, 2, 3])
2431
2432 def test_error_through_destructor(self):
2433 # Test that the exception state is not modified by a destructor,
2434 # even if close() fails.
2435 rawio = self.CloseFailureIO()
2436 def f():
2437 self.TextIOWrapper(rawio).xyzzy
2438 with support.captured_output("stderr") as s:
2439 self.assertRaises(AttributeError, f)
2440 s = s.getvalue().strip()
2441 if s:
2442 # The destructor *may* have printed an unraisable error, check it
2443 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002444 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002445 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002446
Guido van Rossum9b76da62007-04-11 01:09:03 +00002447 # Systematic tests of the text I/O API
2448
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002449 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002450 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002451 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002452 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002453 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002454 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002455 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002456 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002457 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(f.tell(), 0)
2459 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002460 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(f.seek(0), 0)
2462 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002463 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002464 self.assertEqual(f.read(2), "ab")
2465 self.assertEqual(f.read(1), "c")
2466 self.assertEqual(f.read(1), "")
2467 self.assertEqual(f.read(), "")
2468 self.assertEqual(f.tell(), cookie)
2469 self.assertEqual(f.seek(0), 0)
2470 self.assertEqual(f.seek(0, 2), cookie)
2471 self.assertEqual(f.write("def"), 3)
2472 self.assertEqual(f.seek(cookie), cookie)
2473 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002474 if enc.startswith("utf"):
2475 self.multi_line_test(f, enc)
2476 f.close()
2477
2478 def multi_line_test(self, f, enc):
2479 f.seek(0)
2480 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002481 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002482 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002483 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002484 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002485 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002486 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002487 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002488 wlines.append((f.tell(), line))
2489 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002490 f.seek(0)
2491 rlines = []
2492 while True:
2493 pos = f.tell()
2494 line = f.readline()
2495 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002496 break
2497 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002498 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002501 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002502 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002503 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002504 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002505 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002506 p2 = f.tell()
2507 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002508 self.assertEqual(f.tell(), p0)
2509 self.assertEqual(f.readline(), "\xff\n")
2510 self.assertEqual(f.tell(), p1)
2511 self.assertEqual(f.readline(), "\xff\n")
2512 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002513 f.seek(0)
2514 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002515 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002516 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002517 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002518 f.close()
2519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 def test_seeking(self):
2521 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002522 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002523 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002524 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002525 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002526 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002527 suffix = bytes(u_suffix.encode("utf-8"))
2528 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002529 with self.open(support.TESTFN, "wb") as f:
2530 f.write(line*2)
2531 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2532 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002533 self.assertEqual(s, str(prefix, "ascii"))
2534 self.assertEqual(f.tell(), prefix_size)
2535 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002537 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002538 # Regression test for a specific bug
2539 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002540 with self.open(support.TESTFN, "wb") as f:
2541 f.write(data)
2542 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2543 f._CHUNK_SIZE # Just test that it exists
2544 f._CHUNK_SIZE = 2
2545 f.readline()
2546 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 def test_seek_and_tell(self):
2549 #Test seek/tell using the StatefulIncrementalDecoder.
2550 # Make test faster by doing smaller seeks
2551 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002552
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002553 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002554 """Tell/seek to various points within a data stream and ensure
2555 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002557 f.write(data)
2558 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 f = self.open(support.TESTFN, encoding='test_decoder')
2560 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002561 decoded = f.read()
2562 f.close()
2563
Neal Norwitze2b07052008-03-18 19:52:05 +00002564 for i in range(min_pos, len(decoded) + 1): # seek positions
2565 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002566 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002567 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002568 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002570 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002572 f.close()
2573
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002574 # Enable the test decoder.
2575 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002576
2577 # Run the tests.
2578 try:
2579 # Try each test case.
2580 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002581 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002582
2583 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002584 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2585 offset = CHUNK_SIZE - len(input)//2
2586 prefix = b'.'*offset
2587 # Don't bother seeking into the prefix (takes too long).
2588 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002589 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002590
2591 # Ensure our test decoder won't interfere with subsequent tests.
2592 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002593 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002596 data = "1234567890"
2597 tests = ("utf-16",
2598 "utf-16-le",
2599 "utf-16-be",
2600 "utf-32",
2601 "utf-32-le",
2602 "utf-32-be")
2603 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002604 buf = self.BytesIO()
2605 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002606 # Check if the BOM is written only once (see issue1753).
2607 f.write(data)
2608 f.write(data)
2609 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002610 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002611 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002612 self.assertEqual(f.read(), data * 2)
2613 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002614
Benjamin Petersona1b49012009-03-31 23:11:32 +00002615 def test_unreadable(self):
2616 class UnReadable(self.BytesIO):
2617 def readable(self):
2618 return False
2619 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002620 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002622 def test_read_one_by_one(self):
2623 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002624 reads = ""
2625 while True:
2626 c = txt.read(1)
2627 if not c:
2628 break
2629 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002630 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002631
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002632 def test_readlines(self):
2633 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2634 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2635 txt.seek(0)
2636 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2637 txt.seek(0)
2638 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2639
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002640 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002642 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002643 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002644 reads = ""
2645 while True:
2646 c = txt.read(128)
2647 if not c:
2648 break
2649 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002651
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002652 def test_writelines(self):
2653 l = ['ab', 'cd', 'ef']
2654 buf = self.BytesIO()
2655 txt = self.TextIOWrapper(buf)
2656 txt.writelines(l)
2657 txt.flush()
2658 self.assertEqual(buf.getvalue(), b'abcdef')
2659
2660 def test_writelines_userlist(self):
2661 l = UserList(['ab', 'cd', 'ef'])
2662 buf = self.BytesIO()
2663 txt = self.TextIOWrapper(buf)
2664 txt.writelines(l)
2665 txt.flush()
2666 self.assertEqual(buf.getvalue(), b'abcdef')
2667
2668 def test_writelines_error(self):
2669 txt = self.TextIOWrapper(self.BytesIO())
2670 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2671 self.assertRaises(TypeError, txt.writelines, None)
2672 self.assertRaises(TypeError, txt.writelines, b'abc')
2673
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002674 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002676
2677 # read one char at a time
2678 reads = ""
2679 while True:
2680 c = txt.read(1)
2681 if not c:
2682 break
2683 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002684 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002685
2686 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002687 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002688 txt._CHUNK_SIZE = 4
2689
2690 reads = ""
2691 while True:
2692 c = txt.read(4)
2693 if not c:
2694 break
2695 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002696 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002697
2698 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002699 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002700 txt._CHUNK_SIZE = 4
2701
2702 reads = txt.read(4)
2703 reads += txt.read(4)
2704 reads += txt.readline()
2705 reads += txt.readline()
2706 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002707 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002708
2709 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002710 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002711 txt._CHUNK_SIZE = 4
2712
2713 reads = txt.read(4)
2714 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002715 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002716
2717 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002719 txt._CHUNK_SIZE = 4
2720
2721 reads = txt.read(4)
2722 pos = txt.tell()
2723 txt.seek(0)
2724 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002725 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002726
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002727 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002728 buffer = self.BytesIO(self.testdata)
2729 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002730
2731 self.assertEqual(buffer.seekable(), txt.seekable())
2732
Antoine Pitroue4501852009-05-14 18:55:55 +00002733 def test_append_bom(self):
2734 # The BOM is not written again when appending to a non-empty file
2735 filename = support.TESTFN
2736 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2737 with self.open(filename, 'w', encoding=charset) as f:
2738 f.write('aaa')
2739 pos = f.tell()
2740 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002741 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002742
2743 with self.open(filename, 'a', encoding=charset) as f:
2744 f.write('xxx')
2745 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002746 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002747
2748 def test_seek_bom(self):
2749 # Same test, but when seeking manually
2750 filename = support.TESTFN
2751 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2752 with self.open(filename, 'w', encoding=charset) as f:
2753 f.write('aaa')
2754 pos = f.tell()
2755 with self.open(filename, 'r+', encoding=charset) as f:
2756 f.seek(pos)
2757 f.write('zzz')
2758 f.seek(0)
2759 f.write('bbb')
2760 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002761 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002762
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002763 def test_seek_append_bom(self):
2764 # Same test, but first seek to the start and then to the end
2765 filename = support.TESTFN
2766 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2767 with self.open(filename, 'w', encoding=charset) as f:
2768 f.write('aaa')
2769 with self.open(filename, 'a', encoding=charset) as f:
2770 f.seek(0)
2771 f.seek(0, self.SEEK_END)
2772 f.write('xxx')
2773 with self.open(filename, 'rb') as f:
2774 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2775
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002776 def test_errors_property(self):
2777 with self.open(support.TESTFN, "w") as f:
2778 self.assertEqual(f.errors, "strict")
2779 with self.open(support.TESTFN, "w", errors="replace") as f:
2780 self.assertEqual(f.errors, "replace")
2781
Brett Cannon31f59292011-02-21 19:29:56 +00002782 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002783 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002784 def test_threads_write(self):
2785 # Issue6750: concurrent writes could duplicate data
2786 event = threading.Event()
2787 with self.open(support.TESTFN, "w", buffering=1) as f:
2788 def run(n):
2789 text = "Thread%03d\n" % n
2790 event.wait()
2791 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002792 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002793 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002794 with support.start_threads(threads, event.set):
2795 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002796 with self.open(support.TESTFN) as f:
2797 content = f.read()
2798 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002800
Antoine Pitrou6be88762010-05-03 16:48:20 +00002801 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002802 # Test that text file is closed despite failed flush
2803 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002804 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002805 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002806 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002807 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002808 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002809 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002810 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002811 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002812 self.assertTrue(txt.buffer.closed)
2813 self.assertTrue(closed) # flush() called
2814 self.assertFalse(closed[0]) # flush() called before file closed
2815 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002816 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002817
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002818 def test_close_error_on_close(self):
2819 buffer = self.BytesIO(self.testdata)
2820 def bad_flush():
2821 raise OSError('flush')
2822 def bad_close():
2823 raise OSError('close')
2824 buffer.close = bad_close
2825 txt = self.TextIOWrapper(buffer, encoding="ascii")
2826 txt.flush = bad_flush
2827 with self.assertRaises(OSError) as err: # exception not swallowed
2828 txt.close()
2829 self.assertEqual(err.exception.args, ('close',))
2830 self.assertIsInstance(err.exception.__context__, OSError)
2831 self.assertEqual(err.exception.__context__.args, ('flush',))
2832 self.assertFalse(txt.closed)
2833
2834 def test_nonnormalized_close_error_on_close(self):
2835 # Issue #21677
2836 buffer = self.BytesIO(self.testdata)
2837 def bad_flush():
2838 raise non_existing_flush
2839 def bad_close():
2840 raise non_existing_close
2841 buffer.close = bad_close
2842 txt = self.TextIOWrapper(buffer, encoding="ascii")
2843 txt.flush = bad_flush
2844 with self.assertRaises(NameError) as err: # exception not swallowed
2845 txt.close()
2846 self.assertIn('non_existing_close', str(err.exception))
2847 self.assertIsInstance(err.exception.__context__, NameError)
2848 self.assertIn('non_existing_flush', str(err.exception.__context__))
2849 self.assertFalse(txt.closed)
2850
Antoine Pitrou6be88762010-05-03 16:48:20 +00002851 def test_multi_close(self):
2852 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2853 txt.close()
2854 txt.close()
2855 txt.close()
2856 self.assertRaises(ValueError, txt.flush)
2857
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002858 def test_unseekable(self):
2859 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2860 self.assertRaises(self.UnsupportedOperation, txt.tell)
2861 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2862
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002863 def test_readonly_attributes(self):
2864 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2865 buf = self.BytesIO(self.testdata)
2866 with self.assertRaises(AttributeError):
2867 txt.buffer = buf
2868
Antoine Pitroue96ec682011-07-23 21:46:35 +02002869 def test_rawio(self):
2870 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2871 # that subprocess.Popen() can have the required unbuffered
2872 # semantics with universal_newlines=True.
2873 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2874 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2875 # Reads
2876 self.assertEqual(txt.read(4), 'abcd')
2877 self.assertEqual(txt.readline(), 'efghi\n')
2878 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2879
2880 def test_rawio_write_through(self):
2881 # Issue #12591: with write_through=True, writes don't need a flush
2882 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2883 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2884 write_through=True)
2885 txt.write('1')
2886 txt.write('23\n4')
2887 txt.write('5')
2888 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2889
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002890 def test_bufio_write_through(self):
2891 # Issue #21396: write_through=True doesn't force a flush()
2892 # on the underlying binary buffered object.
2893 flush_called, write_called = [], []
2894 class BufferedWriter(self.BufferedWriter):
2895 def flush(self, *args, **kwargs):
2896 flush_called.append(True)
2897 return super().flush(*args, **kwargs)
2898 def write(self, *args, **kwargs):
2899 write_called.append(True)
2900 return super().write(*args, **kwargs)
2901
2902 rawio = self.BytesIO()
2903 data = b"a"
2904 bufio = BufferedWriter(rawio, len(data)*2)
2905 textio = self.TextIOWrapper(bufio, encoding='ascii',
2906 write_through=True)
2907 # write to the buffered io but don't overflow the buffer
2908 text = data.decode('ascii')
2909 textio.write(text)
2910
2911 # buffer.flush is not called with write_through=True
2912 self.assertFalse(flush_called)
2913 # buffer.write *is* called with write_through=True
2914 self.assertTrue(write_called)
2915 self.assertEqual(rawio.getvalue(), b"") # no flush
2916
2917 write_called = [] # reset
2918 textio.write(text * 10) # total content is larger than bufio buffer
2919 self.assertTrue(write_called)
2920 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2921
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002922 def test_read_nonbytes(self):
2923 # Issue #17106
2924 # Crash when underlying read() returns non-bytes
2925 t = self.TextIOWrapper(self.StringIO('a'))
2926 self.assertRaises(TypeError, t.read, 1)
2927 t = self.TextIOWrapper(self.StringIO('a'))
2928 self.assertRaises(TypeError, t.readline)
2929 t = self.TextIOWrapper(self.StringIO('a'))
2930 self.assertRaises(TypeError, t.read)
2931
2932 def test_illegal_decoder(self):
2933 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002934 # Bypass the early encoding check added in issue 20404
2935 def _make_illegal_wrapper():
2936 quopri = codecs.lookup("quopri")
2937 quopri._is_text_encoding = True
2938 try:
2939 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2940 newline='\n', encoding="quopri")
2941 finally:
2942 quopri._is_text_encoding = False
2943 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002944 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002945 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002946 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002947 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002948 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002949 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002950 self.assertRaises(TypeError, t.read)
2951
Antoine Pitrou712cb732013-12-21 15:51:54 +01002952 def _check_create_at_shutdown(self, **kwargs):
2953 # Issue #20037: creating a TextIOWrapper at shutdown
2954 # shouldn't crash the interpreter.
2955 iomod = self.io.__name__
2956 code = """if 1:
2957 import codecs
2958 import {iomod} as io
2959
2960 # Avoid looking up codecs at shutdown
2961 codecs.lookup('utf-8')
2962
2963 class C:
2964 def __init__(self):
2965 self.buf = io.BytesIO()
2966 def __del__(self):
2967 io.TextIOWrapper(self.buf, **{kwargs})
2968 print("ok")
2969 c = C()
2970 """.format(iomod=iomod, kwargs=kwargs)
2971 return assert_python_ok("-c", code)
2972
2973 def test_create_at_shutdown_without_encoding(self):
2974 rc, out, err = self._check_create_at_shutdown()
2975 if err:
2976 # Can error out with a RuntimeError if the module state
2977 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002978 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002979 else:
2980 self.assertEqual("ok", out.decode().strip())
2981
2982 def test_create_at_shutdown_with_encoding(self):
2983 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2984 errors='strict')
2985 self.assertFalse(err)
2986 self.assertEqual("ok", out.decode().strip())
2987
Antoine Pitroub8503892014-04-29 10:14:02 +02002988 def test_read_byteslike(self):
2989 r = MemviewBytesIO(b'Just some random string\n')
2990 t = self.TextIOWrapper(r, 'utf-8')
2991
2992 # TextIOwrapper will not read the full string, because
2993 # we truncate it to a multiple of the native int size
2994 # so that we can construct a more complex memoryview.
2995 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2996
2997 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2998
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002999 def test_issue22849(self):
3000 class F(object):
3001 def readable(self): return True
3002 def writable(self): return True
3003 def seekable(self): return True
3004
3005 for i in range(10):
3006 try:
3007 self.TextIOWrapper(F(), encoding='utf-8')
3008 except Exception:
3009 pass
3010
3011 F.tell = lambda x: 0
3012 t = self.TextIOWrapper(F(), encoding='utf-8')
3013
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003014
Antoine Pitroub8503892014-04-29 10:14:02 +02003015class MemviewBytesIO(io.BytesIO):
3016 '''A BytesIO object whose read method returns memoryviews
3017 rather than bytes'''
3018
3019 def read1(self, len_):
3020 return _to_memoryview(super().read1(len_))
3021
3022 def read(self, len_):
3023 return _to_memoryview(super().read(len_))
3024
3025def _to_memoryview(buf):
3026 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3027
3028 arr = array.array('i')
3029 idx = len(buf) - len(buf) % arr.itemsize
3030 arr.frombytes(buf[:idx])
3031 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003032
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003034class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003035 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003036 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003037
3038 def test_initialization(self):
3039 r = self.BytesIO(b"\xc3\xa9\n\n")
3040 b = self.BufferedReader(r, 1000)
3041 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003042 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3043 self.assertRaises(ValueError, t.read)
3044
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003045 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3046 self.assertRaises(Exception, repr, t)
3047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003048 def test_garbage_collection(self):
3049 # C TextIOWrapper objects are collected, and collecting them flushes
3050 # all data to disk.
3051 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003052 with support.check_warnings(('', ResourceWarning)):
3053 rawio = io.FileIO(support.TESTFN, "wb")
3054 b = self.BufferedWriter(rawio)
3055 t = self.TextIOWrapper(b, encoding="ascii")
3056 t.write("456def")
3057 t.x = t
3058 wr = weakref.ref(t)
3059 del t
3060 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003061 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003062 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003063 self.assertEqual(f.read(), b"456def")
3064
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003065 def test_rwpair_cleared_before_textio(self):
3066 # Issue 13070: TextIOWrapper's finalization would crash when called
3067 # after the reference to the underlying BufferedRWPair's writer got
3068 # cleared by the GC.
3069 for i in range(1000):
3070 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3071 t1 = self.TextIOWrapper(b1, encoding="ascii")
3072 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3073 t2 = self.TextIOWrapper(b2, encoding="ascii")
3074 # circular references
3075 t1.buddy = t2
3076 t2.buddy = t1
3077 support.gc_collect()
3078
3079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003080class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003081 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003082 #shutdown_error = "LookupError: unknown encoding: ascii"
3083 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084
3085
3086class IncrementalNewlineDecoderTest(unittest.TestCase):
3087
3088 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003089 # UTF-8 specific tests for a newline decoder
3090 def _check_decode(b, s, **kwargs):
3091 # We exercise getstate() / setstate() as well as decode()
3092 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003093 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003094 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003095 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003096
Antoine Pitrou180a3362008-12-14 16:36:46 +00003097 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003098
Antoine Pitrou180a3362008-12-14 16:36:46 +00003099 _check_decode(b'\xe8', "")
3100 _check_decode(b'\xa2', "")
3101 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003102
Antoine Pitrou180a3362008-12-14 16:36:46 +00003103 _check_decode(b'\xe8', "")
3104 _check_decode(b'\xa2', "")
3105 _check_decode(b'\x88', "\u8888")
3106
3107 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003108 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3109
Antoine Pitrou180a3362008-12-14 16:36:46 +00003110 decoder.reset()
3111 _check_decode(b'\n', "\n")
3112 _check_decode(b'\r', "")
3113 _check_decode(b'', "\n", final=True)
3114 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003115
Antoine Pitrou180a3362008-12-14 16:36:46 +00003116 _check_decode(b'\r', "")
3117 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003118
Antoine Pitrou180a3362008-12-14 16:36:46 +00003119 _check_decode(b'\r\r\n', "\n\n")
3120 _check_decode(b'\r', "")
3121 _check_decode(b'\r', "\n")
3122 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003123
Antoine Pitrou180a3362008-12-14 16:36:46 +00003124 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3125 _check_decode(b'\xe8\xa2\x88', "\u8888")
3126 _check_decode(b'\n', "\n")
3127 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3128 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003130 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003131 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003132 if encoding is not None:
3133 encoder = codecs.getincrementalencoder(encoding)()
3134 def _decode_bytewise(s):
3135 # Decode one byte at a time
3136 for b in encoder.encode(s):
3137 result.append(decoder.decode(bytes([b])))
3138 else:
3139 encoder = None
3140 def _decode_bytewise(s):
3141 # Decode one char at a time
3142 for c in s:
3143 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003144 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003145 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003146 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003147 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003148 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003149 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003150 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003151 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003152 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003153 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003154 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003155 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003156 input = "abc"
3157 if encoder is not None:
3158 encoder.reset()
3159 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003160 self.assertEqual(decoder.decode(input), "abc")
3161 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003162
3163 def test_newline_decoder(self):
3164 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003165 # None meaning the IncrementalNewlineDecoder takes unicode input
3166 # rather than bytes input
3167 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003168 'utf-16', 'utf-16-le', 'utf-16-be',
3169 'utf-32', 'utf-32-le', 'utf-32-be',
3170 )
3171 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003172 decoder = enc and codecs.getincrementaldecoder(enc)()
3173 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3174 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003175 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003176 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3177 self.check_newline_decoding_utf8(decoder)
3178
Antoine Pitrou66913e22009-03-06 23:40:56 +00003179 def test_newline_bytes(self):
3180 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3181 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003182 self.assertEqual(dec.newlines, None)
3183 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3184 self.assertEqual(dec.newlines, None)
3185 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3186 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003187 dec = self.IncrementalNewlineDecoder(None, translate=False)
3188 _check(dec)
3189 dec = self.IncrementalNewlineDecoder(None, translate=True)
3190 _check(dec)
3191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003192class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3193 pass
3194
3195class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3196 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003197
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003198
Guido van Rossum01a27522007-03-07 01:00:12 +00003199# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003200
Guido van Rossum5abbf752007-08-27 17:39:33 +00003201class MiscIOTest(unittest.TestCase):
3202
Barry Warsaw40e82462008-11-20 20:14:50 +00003203 def tearDown(self):
3204 support.unlink(support.TESTFN)
3205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003206 def test___all__(self):
3207 for name in self.io.__all__:
3208 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003209 self.assertIsNotNone(obj, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003210 if name == "open":
3211 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003212 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003213 self.assertTrue(issubclass(obj, Exception), name)
3214 elif not name.startswith("SEEK_"):
3215 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003216
Barry Warsaw40e82462008-11-20 20:14:50 +00003217 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003218 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003219 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003220 f.close()
3221
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003222 with support.check_warnings(('', DeprecationWarning)):
3223 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003224 self.assertEqual(f.name, support.TESTFN)
3225 self.assertEqual(f.buffer.name, support.TESTFN)
3226 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3227 self.assertEqual(f.mode, "U")
3228 self.assertEqual(f.buffer.mode, "rb")
3229 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003230 f.close()
3231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003232 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003233 self.assertEqual(f.mode, "w+")
3234 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3235 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003237 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003238 self.assertEqual(g.mode, "wb")
3239 self.assertEqual(g.raw.mode, "wb")
3240 self.assertEqual(g.name, f.fileno())
3241 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003242 f.close()
3243 g.close()
3244
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003245 def test_io_after_close(self):
3246 for kwargs in [
3247 {"mode": "w"},
3248 {"mode": "wb"},
3249 {"mode": "w", "buffering": 1},
3250 {"mode": "w", "buffering": 2},
3251 {"mode": "wb", "buffering": 0},
3252 {"mode": "r"},
3253 {"mode": "rb"},
3254 {"mode": "r", "buffering": 1},
3255 {"mode": "r", "buffering": 2},
3256 {"mode": "rb", "buffering": 0},
3257 {"mode": "w+"},
3258 {"mode": "w+b"},
3259 {"mode": "w+", "buffering": 1},
3260 {"mode": "w+", "buffering": 2},
3261 {"mode": "w+b", "buffering": 0},
3262 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003263 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003264 f.close()
3265 self.assertRaises(ValueError, f.flush)
3266 self.assertRaises(ValueError, f.fileno)
3267 self.assertRaises(ValueError, f.isatty)
3268 self.assertRaises(ValueError, f.__iter__)
3269 if hasattr(f, "peek"):
3270 self.assertRaises(ValueError, f.peek, 1)
3271 self.assertRaises(ValueError, f.read)
3272 if hasattr(f, "read1"):
3273 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003274 if hasattr(f, "readall"):
3275 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003276 if hasattr(f, "readinto"):
3277 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003278 if hasattr(f, "readinto1"):
3279 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003280 self.assertRaises(ValueError, f.readline)
3281 self.assertRaises(ValueError, f.readlines)
3282 self.assertRaises(ValueError, f.seek, 0)
3283 self.assertRaises(ValueError, f.tell)
3284 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003285 self.assertRaises(ValueError, f.write,
3286 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003287 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003288 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003289
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003290 def test_blockingioerror(self):
3291 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003292 class C(str):
3293 pass
3294 c = C("")
3295 b = self.BlockingIOError(1, c)
3296 c.b = b
3297 b.c = c
3298 wr = weakref.ref(c)
3299 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003300 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003301 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003302
3303 def test_abcs(self):
3304 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003305 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3306 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3307 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3308 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003309
3310 def _check_abc_inheritance(self, abcmodule):
3311 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003312 self.assertIsInstance(f, abcmodule.IOBase)
3313 self.assertIsInstance(f, abcmodule.RawIOBase)
3314 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3315 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003316 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003317 self.assertIsInstance(f, abcmodule.IOBase)
3318 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3319 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3320 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003321 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003322 self.assertIsInstance(f, abcmodule.IOBase)
3323 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3324 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3325 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003326
3327 def test_abc_inheritance(self):
3328 # Test implementations inherit from their respective ABCs
3329 self._check_abc_inheritance(self)
3330
3331 def test_abc_inheritance_official(self):
3332 # Test implementations inherit from the official ABCs of the
3333 # baseline "io" module.
3334 self._check_abc_inheritance(io)
3335
Antoine Pitroue033e062010-10-29 10:38:18 +00003336 def _check_warn_on_dealloc(self, *args, **kwargs):
3337 f = open(*args, **kwargs)
3338 r = repr(f)
3339 with self.assertWarns(ResourceWarning) as cm:
3340 f = None
3341 support.gc_collect()
3342 self.assertIn(r, str(cm.warning.args[0]))
3343
3344 def test_warn_on_dealloc(self):
3345 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3346 self._check_warn_on_dealloc(support.TESTFN, "wb")
3347 self._check_warn_on_dealloc(support.TESTFN, "w")
3348
3349 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3350 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003351 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003352 for fd in fds:
3353 try:
3354 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003355 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003356 if e.errno != errno.EBADF:
3357 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003358 self.addCleanup(cleanup_fds)
3359 r, w = os.pipe()
3360 fds += r, w
3361 self._check_warn_on_dealloc(r, *args, **kwargs)
3362 # When using closefd=False, there's no warning
3363 r, w = os.pipe()
3364 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02003365 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00003366 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00003367
3368 def test_warn_on_dealloc_fd(self):
3369 self._check_warn_on_dealloc_fd("rb", buffering=0)
3370 self._check_warn_on_dealloc_fd("rb")
3371 self._check_warn_on_dealloc_fd("r")
3372
3373
Antoine Pitrou243757e2010-11-05 21:15:39 +00003374 def test_pickling(self):
3375 # Pickling file objects is forbidden
3376 for kwargs in [
3377 {"mode": "w"},
3378 {"mode": "wb"},
3379 {"mode": "wb", "buffering": 0},
3380 {"mode": "r"},
3381 {"mode": "rb"},
3382 {"mode": "rb", "buffering": 0},
3383 {"mode": "w+"},
3384 {"mode": "w+b"},
3385 {"mode": "w+b", "buffering": 0},
3386 ]:
3387 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3388 with self.open(support.TESTFN, **kwargs) as f:
3389 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3390
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003391 def test_nonblock_pipe_write_bigbuf(self):
3392 self._test_nonblock_pipe_write(16*1024)
3393
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003394 def test_nonblock_pipe_write_smallbuf(self):
3395 self._test_nonblock_pipe_write(1024)
3396
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003397 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3398 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003399 def _test_nonblock_pipe_write(self, bufsize):
3400 sent = []
3401 received = []
3402 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003403 os.set_blocking(r, False)
3404 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003405
3406 # To exercise all code paths in the C implementation we need
3407 # to play with buffer sizes. For instance, if we choose a
3408 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3409 # then we will never get a partial write of the buffer.
3410 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3411 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3412
3413 with rf, wf:
3414 for N in 9999, 73, 7574:
3415 try:
3416 i = 0
3417 while True:
3418 msg = bytes([i % 26 + 97]) * N
3419 sent.append(msg)
3420 wf.write(msg)
3421 i += 1
3422
3423 except self.BlockingIOError as e:
3424 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003425 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003426 sent[-1] = sent[-1][:e.characters_written]
3427 received.append(rf.read())
3428 msg = b'BLOCKED'
3429 wf.write(msg)
3430 sent.append(msg)
3431
3432 while True:
3433 try:
3434 wf.flush()
3435 break
3436 except self.BlockingIOError as e:
3437 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003438 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003439 self.assertEqual(e.characters_written, 0)
3440 received.append(rf.read())
3441
3442 received += iter(rf.read, None)
3443
3444 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003445 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003446 self.assertTrue(wf.closed)
3447 self.assertTrue(rf.closed)
3448
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003449 def test_create_fail(self):
3450 # 'x' mode fails if file is existing
3451 with self.open(support.TESTFN, 'w'):
3452 pass
3453 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3454
3455 def test_create_writes(self):
3456 # 'x' mode opens for writing
3457 with self.open(support.TESTFN, 'xb') as f:
3458 f.write(b"spam")
3459 with self.open(support.TESTFN, 'rb') as f:
3460 self.assertEqual(b"spam", f.read())
3461
Christian Heimes7b648752012-09-10 14:48:43 +02003462 def test_open_allargs(self):
3463 # there used to be a buffer overflow in the parser for rawmode
3464 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3465
3466
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003467class CMiscIOTest(MiscIOTest):
3468 io = io
3469
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003470 def test_readinto_buffer_overflow(self):
3471 # Issue #18025
3472 class BadReader(self.io.BufferedIOBase):
3473 def read(self, n=-1):
3474 return b'x' * 10**6
3475 bufio = BadReader()
3476 b = bytearray(2)
3477 self.assertRaises(ValueError, bufio.readinto, b)
3478
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003479 @unittest.skipUnless(threading, 'Threading required for this test.')
3480 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3481 # Issue #23309: deadlocks at shutdown should be avoided when a
3482 # daemon thread and the main thread both write to a file.
3483 code = """if 1:
3484 import sys
3485 import time
3486 import threading
3487
3488 file = sys.{stream_name}
3489
3490 def run():
3491 while True:
3492 file.write('.')
3493 file.flush()
3494
3495 thread = threading.Thread(target=run)
3496 thread.daemon = True
3497 thread.start()
3498
3499 time.sleep(0.5)
3500 file.write('!')
3501 file.flush()
3502 """.format_map(locals())
3503 res, _ = run_python_until_end("-c", code)
3504 err = res.err.decode()
3505 if res.rc != 0:
3506 # Failure: should be a fatal error
3507 self.assertIn("Fatal Python error: could not acquire lock "
3508 "for <_io.BufferedWriter name='<{stream_name}>'> "
3509 "at interpreter shutdown, possibly due to "
3510 "daemon threads".format_map(locals()),
3511 err)
3512 else:
3513 self.assertFalse(err.strip('.!'))
3514
3515 def test_daemon_threads_shutdown_stdout_deadlock(self):
3516 self.check_daemon_threads_shutdown_deadlock('stdout')
3517
3518 def test_daemon_threads_shutdown_stderr_deadlock(self):
3519 self.check_daemon_threads_shutdown_deadlock('stderr')
3520
3521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003522class PyMiscIOTest(MiscIOTest):
3523 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003524
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003525
3526@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3527class SignalsTest(unittest.TestCase):
3528
3529 def setUp(self):
3530 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3531
3532 def tearDown(self):
3533 signal.signal(signal.SIGALRM, self.oldalrm)
3534
3535 def alarm_interrupt(self, sig, frame):
3536 1/0
3537
3538 @unittest.skipUnless(threading, 'Threading required for this test.')
3539 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3540 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003541 invokes the signal handler, and bubbles up the exception raised
3542 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003543 read_results = []
3544 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003545 if hasattr(signal, 'pthread_sigmask'):
3546 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003547 s = os.read(r, 1)
3548 read_results.append(s)
3549 t = threading.Thread(target=_read)
3550 t.daemon = True
3551 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003552 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003553 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003554 try:
3555 wio = self.io.open(w, **fdopen_kwargs)
3556 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003557 # Fill the pipe enough that the write will be blocking.
3558 # It will be interrupted by the timer armed above. Since the
3559 # other thread has read one byte, the low-level write will
3560 # return with a successful (partial) result rather than an EINTR.
3561 # The buffered IO layer must check for pending signal
3562 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003563 signal.alarm(1)
3564 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003565 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003566 finally:
3567 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003568 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003569 # We got one byte, get another one and check that it isn't a
3570 # repeat of the first one.
3571 read_results.append(os.read(r, 1))
3572 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3573 finally:
3574 os.close(w)
3575 os.close(r)
3576 # This is deliberate. If we didn't close the file descriptor
3577 # before closing wio, wio would try to flush its internal
3578 # buffer, and block again.
3579 try:
3580 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003581 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003582 if e.errno != errno.EBADF:
3583 raise
3584
3585 def test_interrupted_write_unbuffered(self):
3586 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3587
3588 def test_interrupted_write_buffered(self):
3589 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3590
Victor Stinner6ab72862014-09-03 23:32:28 +02003591 # Issue #22331: The test hangs on FreeBSD 7.2
3592 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003593 def test_interrupted_write_text(self):
3594 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3595
Brett Cannon31f59292011-02-21 19:29:56 +00003596 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003597 def check_reentrant_write(self, data, **fdopen_kwargs):
3598 def on_alarm(*args):
3599 # Will be called reentrantly from the same thread
3600 wio.write(data)
3601 1/0
3602 signal.signal(signal.SIGALRM, on_alarm)
3603 r, w = os.pipe()
3604 wio = self.io.open(w, **fdopen_kwargs)
3605 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003606 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003607 # Either the reentrant call to wio.write() fails with RuntimeError,
3608 # or the signal handler raises ZeroDivisionError.
3609 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3610 while 1:
3611 for i in range(100):
3612 wio.write(data)
3613 wio.flush()
3614 # Make sure the buffer doesn't fill up and block further writes
3615 os.read(r, len(data) * 100)
3616 exc = cm.exception
3617 if isinstance(exc, RuntimeError):
3618 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3619 finally:
3620 wio.close()
3621 os.close(r)
3622
3623 def test_reentrant_write_buffered(self):
3624 self.check_reentrant_write(b"xy", mode="wb")
3625
3626 def test_reentrant_write_text(self):
3627 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3628
Antoine Pitrou707ce822011-02-25 21:24:11 +00003629 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3630 """Check that a buffered read, when it gets interrupted (either
3631 returning a partial result or EINTR), properly invokes the signal
3632 handler and retries if the latter returned successfully."""
3633 r, w = os.pipe()
3634 fdopen_kwargs["closefd"] = False
3635 def alarm_handler(sig, frame):
3636 os.write(w, b"bar")
3637 signal.signal(signal.SIGALRM, alarm_handler)
3638 try:
3639 rio = self.io.open(r, **fdopen_kwargs)
3640 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003641 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003642 # Expected behaviour:
3643 # - first raw read() returns partial b"foo"
3644 # - second raw read() returns EINTR
3645 # - third raw read() returns b"bar"
3646 self.assertEqual(decode(rio.read(6)), "foobar")
3647 finally:
3648 rio.close()
3649 os.close(w)
3650 os.close(r)
3651
Antoine Pitrou20db5112011-08-19 20:32:34 +02003652 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003653 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3654 mode="rb")
3655
Antoine Pitrou20db5112011-08-19 20:32:34 +02003656 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003657 self.check_interrupted_read_retry(lambda x: x,
3658 mode="r")
3659
3660 @unittest.skipUnless(threading, 'Threading required for this test.')
3661 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3662 """Check that a buffered write, when it gets interrupted (either
3663 returning a partial result or EINTR), properly invokes the signal
3664 handler and retries if the latter returned successfully."""
3665 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003666
Antoine Pitrou707ce822011-02-25 21:24:11 +00003667 # A quantity that exceeds the buffer size of an anonymous pipe's
3668 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003669 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003670 r, w = os.pipe()
3671 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003672
Antoine Pitrou707ce822011-02-25 21:24:11 +00003673 # We need a separate thread to read from the pipe and allow the
3674 # write() to finish. This thread is started after the SIGALRM is
3675 # received (forcing a first EINTR in write()).
3676 read_results = []
3677 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003678 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003679 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003680 try:
3681 while not write_finished:
3682 while r in select.select([r], [], [], 1.0)[0]:
3683 s = os.read(r, 1024)
3684 read_results.append(s)
3685 except BaseException as exc:
3686 nonlocal error
3687 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003688 t = threading.Thread(target=_read)
3689 t.daemon = True
3690 def alarm1(sig, frame):
3691 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003692 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003693 def alarm2(sig, frame):
3694 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003695
3696 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003697 signal.signal(signal.SIGALRM, alarm1)
3698 try:
3699 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003700 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003701 # Expected behaviour:
3702 # - first raw write() is partial (because of the limited pipe buffer
3703 # and the first alarm)
3704 # - second raw write() returns EINTR (because of the second alarm)
3705 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003706 written = wio.write(large_data)
3707 self.assertEqual(N, written)
3708
Antoine Pitrou707ce822011-02-25 21:24:11 +00003709 wio.flush()
3710 write_finished = True
3711 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003712
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003713 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003714 self.assertEqual(N, sum(len(x) for x in read_results))
3715 finally:
3716 write_finished = True
3717 os.close(w)
3718 os.close(r)
3719 # This is deliberate. If we didn't close the file descriptor
3720 # before closing wio, wio would try to flush its internal
3721 # buffer, and could block (in case of failure).
3722 try:
3723 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003724 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003725 if e.errno != errno.EBADF:
3726 raise
3727
Antoine Pitrou20db5112011-08-19 20:32:34 +02003728 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003729 self.check_interrupted_write_retry(b"x", mode="wb")
3730
Antoine Pitrou20db5112011-08-19 20:32:34 +02003731 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003732 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3733
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003734
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003735class CSignalsTest(SignalsTest):
3736 io = io
3737
3738class PySignalsTest(SignalsTest):
3739 io = pyio
3740
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003741 # Handling reentrancy issues would slow down _pyio even more, so the
3742 # tests are disabled.
3743 test_reentrant_write_buffered = None
3744 test_reentrant_write_text = None
3745
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003746
Ezio Melottidaa42c72013-03-23 16:30:16 +02003747def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07003748 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003749 CBufferedReaderTest, PyBufferedReaderTest,
3750 CBufferedWriterTest, PyBufferedWriterTest,
3751 CBufferedRWPairTest, PyBufferedRWPairTest,
3752 CBufferedRandomTest, PyBufferedRandomTest,
3753 StatefulIncrementalDecoderTest,
3754 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3755 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003756 CMiscIOTest, PyMiscIOTest,
3757 CSignalsTest, PySignalsTest,
3758 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003759
3760 # Put the namespaces of the IO module we are testing and some useful mock
3761 # classes in the __dict__ of each test.
3762 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003763 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003764 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3765 c_io_ns = {name : getattr(io, name) for name in all_members}
3766 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3767 globs = globals()
3768 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3769 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3770 # Avoid turning open into a bound method.
3771 py_io_ns["open"] = pyio.OpenWrapper
3772 for test in tests:
3773 if test.__name__.startswith("C"):
3774 for name, obj in c_io_ns.items():
3775 setattr(test, name, obj)
3776 elif test.__name__.startswith("Py"):
3777 for name, obj in py_io_ns.items():
3778 setattr(test, name, obj)
3779
Ezio Melottidaa42c72013-03-23 16:30:16 +02003780 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3781 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003782
3783if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003784 unittest.main()