blob: 6a41ae62d9cab091b4fecb516a4b3bb9d1b484a9 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000048def _default_chunk_size():
49 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000050 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 return f._CHUNK_SIZE
52
53
Antoine Pitrou328ec742010-09-14 18:37:24 +000054class MockRawIOWithoutRead:
55 """A RawIO implementation without read(), so as to exercise the default
56 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 def __init__(self, read_stack=()):
59 self._read_stack = list(read_stack)
60 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000066 return len(b)
67
68 def writable(self):
69 return True
70
Guido van Rossum68bbcd22007-02-27 17:19:33 +000071 def fileno(self):
72 return 42
73
74 def readable(self):
75 return True
76
Guido van Rossum01a27522007-03-07 01:00:12 +000077 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000082
83 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # same comment as above
85
86 def readinto(self, buf):
87 self._reads += 1
88 max_len = len(buf)
89 try:
90 data = self._read_stack[0]
91 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000093 return 0
94 if data is None:
95 del self._read_stack[0]
96 return None
97 n = len(data)
98 if len(data) <= max_len:
99 del self._read_stack[0]
100 buf[:n] = data
101 return n
102 else:
103 buf[:] = data[:max_len]
104 self._read_stack[0] = data[max_len:]
105 return max_len
106
107 def truncate(self, pos=None):
108 return pos
109
Antoine Pitrou328ec742010-09-14 18:37:24 +0000110class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
111 pass
112
113class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
114 pass
115
116
117class MockRawIO(MockRawIOWithoutRead):
118
119 def read(self, n=None):
120 self._reads += 1
121 try:
122 return self._read_stack.pop(0)
123 except:
124 self._extraneous_reads += 1
125 return b""
126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000127class CMockRawIO(MockRawIO, io.RawIOBase):
128 pass
129
130class PyMockRawIO(MockRawIO, pyio.RawIOBase):
131 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000132
Guido van Rossuma9e20242007-03-08 00:43:48 +0000133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000134class MisbehavedRawIO(MockRawIO):
135 def write(self, b):
136 return super().write(b) * 2
137
138 def read(self, n=None):
139 return super().read(n) * 2
140
141 def seek(self, pos, whence):
142 return -123
143
144 def tell(self):
145 return -456
146
147 def readinto(self, buf):
148 super().readinto(buf)
149 return len(buf) * 5
150
151class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
152 pass
153
154class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
155 pass
156
157
158class CloseFailureIO(MockRawIO):
159 closed = 0
160
161 def close(self):
162 if not self.closed:
163 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200164 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000165
166class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
167 pass
168
169class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
170 pass
171
172
173class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000174
175 def __init__(self, data):
176 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000177 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181 self.read_history.append(None if res is None else len(res))
182 return res
183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 def readinto(self, b):
185 res = super().readinto(b)
186 self.read_history.append(res)
187 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000189class CMockFileIO(MockFileIO, io.BytesIO):
190 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class PyMockFileIO(MockFileIO, pyio.BytesIO):
193 pass
194
195
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000196class MockUnseekableIO:
197 def seekable(self):
198 return False
199
200 def seek(self, *args):
201 raise self.UnsupportedOperation("not seekable")
202
203 def tell(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
207 UnsupportedOperation = io.UnsupportedOperation
208
209class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
210 UnsupportedOperation = pyio.UnsupportedOperation
211
212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000213class MockNonBlockWriterIO:
214
215 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000216 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 def pop_written(self):
220 s = b"".join(self._write_stack)
221 self._write_stack[:] = []
222 return s
223
224 def block_on(self, char):
225 """Block when a given char is encountered."""
226 self._blocker_char = char
227
228 def readable(self):
229 return True
230
231 def seekable(self):
232 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000233
Guido van Rossum01a27522007-03-07 01:00:12 +0000234 def writable(self):
235 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def write(self, b):
238 b = bytes(b)
239 n = -1
240 if self._blocker_char:
241 try:
242 n = b.index(self._blocker_char)
243 except ValueError:
244 pass
245 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100246 if n > 0:
247 # write data up to the first blocker
248 self._write_stack.append(b[:n])
249 return n
250 else:
251 # cancel blocker and indicate would block
252 self._blocker_char = None
253 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000254 self._write_stack.append(b)
255 return len(b)
256
257class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
258 BlockingIOError = io.BlockingIOError
259
260class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
261 BlockingIOError = pyio.BlockingIOError
262
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum28524c72007-02-27 05:47:44 +0000264class IOTest(unittest.TestCase):
265
Neal Norwitze7789b12008-03-24 06:18:09 +0000266 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000268
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000269 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271
Guido van Rossum28524c72007-02-27 05:47:44 +0000272 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000274 f.truncate(0)
275 self.assertEqual(f.tell(), 5)
276 f.seek(0)
277
278 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
280 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000281 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000284 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000286 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(-1, 2), 13)
288 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000289
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000292 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000293
Guido van Rossum9b76da62007-04-11 01:09:03 +0000294 def read_ops(self, f, buffered=False):
295 data = f.read(5)
296 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000297 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 self.assertEqual(f.readinto(data), 5)
299 self.assertEqual(data, b" worl")
300 self.assertEqual(f.readinto(data), 2)
301 self.assertEqual(len(data), 5)
302 self.assertEqual(data[:2], b"d\n")
303 self.assertEqual(f.seek(0), 0)
304 self.assertEqual(f.read(20), b"hello world\n")
305 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000306 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000307 self.assertEqual(f.seek(-6, 2), 6)
308 self.assertEqual(f.read(5), b"world")
309 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 1), 5)
312 self.assertEqual(f.read(5), b" worl")
313 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000314 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 if buffered:
316 f.seek(0)
317 self.assertEqual(f.read(), b"hello world\n")
318 f.seek(6)
319 self.assertEqual(f.read(), b"world\n")
320 self.assertEqual(f.read(), b"")
321
Guido van Rossum34d69e52007-04-10 20:08:41 +0000322 LARGE = 2**31
323
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 def large_file_ops(self, f):
325 assert f.readable()
326 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.seek(self.LARGE), self.LARGE)
328 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000329 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.tell(), self.LARGE + 3)
331 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 2)
334 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
338 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 self.assertEqual(f.read(2), b"x")
340
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000341 def test_invalid_operations(self):
342 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000343 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000345 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 self.assertRaises(exc, fp.read)
347 self.assertRaises(exc, fp.readline)
348 with self.open(support.TESTFN, "wb", buffering=0) as fp:
349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "rb", buffering=0) as fp:
352 self.assertRaises(exc, fp.write, b"blah")
353 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000354 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, "blah")
359 self.assertRaises(exc, fp.writelines, ["blah\n"])
360 # Non-zero seeking from current or end pos
361 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
362 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000363
Antoine Pitrou13348842012-01-29 18:36:34 +0100364 def test_open_handles_NUL_chars(self):
365 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300366 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
367 self.assertRaises(ValueError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100368
Guido van Rossum28524c72007-02-27 05:47:44 +0000369 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000370 with self.open(support.TESTFN, "wb", buffering=0) as f:
371 self.assertEqual(f.readable(), False)
372 self.assertEqual(f.writable(), True)
373 self.assertEqual(f.seekable(), True)
374 self.write_ops(f)
375 with self.open(support.TESTFN, "rb", buffering=0) as f:
376 self.assertEqual(f.readable(), True)
377 self.assertEqual(f.writable(), False)
378 self.assertEqual(f.seekable(), True)
379 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000380
Guido van Rossum87429772007-04-10 21:06:59 +0000381 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000382 with self.open(support.TESTFN, "wb") as f:
383 self.assertEqual(f.readable(), False)
384 self.assertEqual(f.writable(), True)
385 self.assertEqual(f.seekable(), True)
386 self.write_ops(f)
387 with self.open(support.TESTFN, "rb") as f:
388 self.assertEqual(f.readable(), True)
389 self.assertEqual(f.writable(), False)
390 self.assertEqual(f.seekable(), True)
391 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000392
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000393 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000394 with self.open(support.TESTFN, "wb") as f:
395 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
396 with self.open(support.TESTFN, "rb") as f:
397 self.assertEqual(f.readline(), b"abc\n")
398 self.assertEqual(f.readline(10), b"def\n")
399 self.assertEqual(f.readline(2), b"xy")
400 self.assertEqual(f.readline(4), b"zzy\n")
401 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000402 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000403 self.assertRaises(TypeError, f.readline, 5.3)
404 with self.open(support.TESTFN, "r") as f:
405 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000406
Guido van Rossum28524c72007-02-27 05:47:44 +0000407 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000409 self.write_ops(f)
410 data = f.getvalue()
411 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000413 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000414
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000416 # On Windows and Mac OSX this test comsumes large resources; It takes
417 # a long time to build the >2GB file and takes >2GB of disk space
418 # therefore the resource must be enabled to run this test.
419 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600420 support.requires(
421 'largefile',
422 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 with self.open(support.TESTFN, "w+b", 0) as f:
424 self.large_file_ops(f)
425 with self.open(support.TESTFN, "w+b") as f:
426 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000427
428 def test_with_open(self):
429 for bufsize in (0, 1, 100):
430 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000431 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000432 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000433 self.assertEqual(f.closed, True)
434 f = None
435 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000436 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000437 1/0
438 except ZeroDivisionError:
439 self.assertEqual(f.closed, True)
440 else:
441 self.fail("1/0 didn't raise an exception")
442
Antoine Pitrou08838b62009-01-21 00:55:13 +0000443 # issue 5008
444 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000446 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000452 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000453
Guido van Rossum87429772007-04-10 21:06:59 +0000454 def test_destructor(self):
455 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000457 def __del__(self):
458 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 try:
460 f = super().__del__
461 except AttributeError:
462 pass
463 else:
464 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def close(self):
466 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000468 def flush(self):
469 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000471 with support.check_warnings(('', ResourceWarning)):
472 f = MyFileIO(support.TESTFN, "wb")
473 f.write(b"xxx")
474 del f
475 support.gc_collect()
476 self.assertEqual(record, [1, 2, 3])
477 with self.open(support.TESTFN, "rb") as f:
478 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479
480 def _check_base_destructor(self, base):
481 record = []
482 class MyIO(base):
483 def __init__(self):
484 # This exercises the availability of attributes on object
485 # destruction.
486 # (in the C version, close() is called by the tp_dealloc
487 # function, not by __del__)
488 self.on_del = 1
489 self.on_close = 2
490 self.on_flush = 3
491 def __del__(self):
492 record.append(self.on_del)
493 try:
494 f = super().__del__
495 except AttributeError:
496 pass
497 else:
498 f()
499 def close(self):
500 record.append(self.on_close)
501 super().close()
502 def flush(self):
503 record.append(self.on_flush)
504 super().flush()
505 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000506 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000507 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 self.assertEqual(record, [1, 2, 3])
509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 def test_IOBase_destructor(self):
511 self._check_base_destructor(self.IOBase)
512
513 def test_RawIOBase_destructor(self):
514 self._check_base_destructor(self.RawIOBase)
515
516 def test_BufferedIOBase_destructor(self):
517 self._check_base_destructor(self.BufferedIOBase)
518
519 def test_TextIOBase_destructor(self):
520 self._check_base_destructor(self.TextIOBase)
521
Guido van Rossum87429772007-04-10 21:06:59 +0000522 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000523 with self.open(support.TESTFN, "wb") as f:
524 f.write(b"xxx")
525 with self.open(support.TESTFN, "rb") as f:
526 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000527
Guido van Rossumd4103952007-04-12 05:44:49 +0000528 def test_array_writes(self):
529 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000530 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb", 0) as f:
532 self.assertEqual(f.write(a), n)
533 with self.open(support.TESTFN, "wb") as f:
534 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000535
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000536 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000539
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000540 def test_read_closed(self):
541 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000542 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543 with self.open(support.TESTFN, "r") as f:
544 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000545 self.assertEqual(file.read(), "egg\n")
546 file.seek(0)
547 file.close()
548 self.assertRaises(ValueError, file.read)
549
550 def test_no_closefd_with_filename(self):
551 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553
554 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000556 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(file.buffer.raw.closefd, False)
561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000562 def test_garbage_collection(self):
563 # FileIO objects are collected, and collecting them flushes
564 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000565 with support.check_warnings(('', ResourceWarning)):
566 f = self.FileIO(support.TESTFN, "wb")
567 f.write(b"abcxxx")
568 f.f = f
569 wr = weakref.ref(f)
570 del f
571 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000572 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000573 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000574 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000575
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000576 def test_unbounded_file(self):
577 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
578 zero = "/dev/zero"
579 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000580 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000581 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000585 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000586 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
591
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200592 def check_flush_error_on_close(self, *args, **kwargs):
593 # Test that the file is closed despite failed flush
594 # and that flush() is called before file closed.
595 f = self.open(*args, **kwargs)
596 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000597 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200598 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200603 self.assertTrue(closed) # flush() called
604 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200605 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200606
607 def test_flush_error_on_close(self):
608 # raw file
609 # Issue #5700: io.FileIO calls flush() after file closed
610 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
611 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
612 self.check_flush_error_on_close(fd, 'wb', buffering=0)
613 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
614 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
615 os.close(fd)
616 # buffered io
617 self.check_flush_error_on_close(support.TESTFN, 'wb')
618 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
619 self.check_flush_error_on_close(fd, 'wb')
620 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
621 self.check_flush_error_on_close(fd, 'wb', closefd=False)
622 os.close(fd)
623 # text io
624 self.check_flush_error_on_close(support.TESTFN, 'w')
625 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
626 self.check_flush_error_on_close(fd, 'w')
627 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
628 self.check_flush_error_on_close(fd, 'w', closefd=False)
629 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000630
631 def test_multi_close(self):
632 f = self.open(support.TESTFN, "wb", buffering=0)
633 f.close()
634 f.close()
635 f.close()
636 self.assertRaises(ValueError, f.flush)
637
Antoine Pitrou328ec742010-09-14 18:37:24 +0000638 def test_RawIOBase_read(self):
639 # Exercise the default RawIOBase.read() implementation (which calls
640 # readinto() internally).
641 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
642 self.assertEqual(rawio.read(2), b"ab")
643 self.assertEqual(rawio.read(2), b"c")
644 self.assertEqual(rawio.read(2), b"d")
645 self.assertEqual(rawio.read(2), None)
646 self.assertEqual(rawio.read(2), b"ef")
647 self.assertEqual(rawio.read(2), b"g")
648 self.assertEqual(rawio.read(2), None)
649 self.assertEqual(rawio.read(2), b"")
650
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400651 def test_types_have_dict(self):
652 test = (
653 self.IOBase(),
654 self.RawIOBase(),
655 self.TextIOBase(),
656 self.StringIO(),
657 self.BytesIO()
658 )
659 for obj in test:
660 self.assertTrue(hasattr(obj, "__dict__"))
661
Ross Lagerwall59142db2011-10-31 20:34:46 +0200662 def test_opener(self):
663 with self.open(support.TESTFN, "w") as f:
664 f.write("egg\n")
665 fd = os.open(support.TESTFN, os.O_RDONLY)
666 def opener(path, flags):
667 return fd
668 with self.open("non-existent", "r", opener=opener) as f:
669 self.assertEqual(f.read(), "egg\n")
670
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200671 def test_fileio_closefd(self):
672 # Issue #4841
673 with self.open(__file__, 'rb') as f1, \
674 self.open(__file__, 'rb') as f2:
675 fileio = self.FileIO(f1.fileno(), closefd=False)
676 # .__init__() must not close f1
677 fileio.__init__(f2.fileno(), closefd=False)
678 f1.readline()
679 # .close() must not close f2
680 fileio.close()
681 f2.readline()
682
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300683 def test_nonbuffered_textio(self):
684 with warnings.catch_warnings(record=True) as recorded:
685 with self.assertRaises(ValueError):
686 self.open(support.TESTFN, 'w', buffering=0)
687 support.gc_collect()
688 self.assertEqual(recorded, [])
689
690 def test_invalid_newline(self):
691 with warnings.catch_warnings(record=True) as recorded:
692 with self.assertRaises(ValueError):
693 self.open(support.TESTFN, 'w', newline='invalid')
694 support.gc_collect()
695 self.assertEqual(recorded, [])
696
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200699
700 def test_IOBase_finalize(self):
701 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
702 # class which inherits IOBase and an object of this class are caught
703 # in a reference cycle and close() is already in the method cache.
704 class MyIO(self.IOBase):
705 def close(self):
706 pass
707
708 # create an instance to populate the method cache
709 MyIO()
710 obj = MyIO()
711 obj.obj = obj
712 wr = weakref.ref(obj)
713 del MyIO
714 del obj
715 support.gc_collect()
716 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718class PyIOTest(IOTest):
719 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000720
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722class CommonBufferedTests:
723 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
724
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000725 def test_detach(self):
726 raw = self.MockRawIO()
727 buf = self.tp(raw)
728 self.assertIs(buf.detach(), raw)
729 self.assertRaises(ValueError, buf.detach)
730
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600731 repr(buf) # Should still work
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_fileno(self):
734 rawio = self.MockRawIO()
735 bufio = self.tp(rawio)
736
Ezio Melottib3aedd42010-11-20 19:04:17 +0000737 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000738
Zachary Ware9fe6d862013-12-08 00:20:35 -0600739 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000740 def test_no_fileno(self):
741 # XXX will we always have fileno() function? If so, kill
742 # this test. Else, write it.
743 pass
744
745 def test_invalid_args(self):
746 rawio = self.MockRawIO()
747 bufio = self.tp(rawio)
748 # Invalid whence
749 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200750 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751
752 def test_override_destructor(self):
753 tp = self.tp
754 record = []
755 class MyBufferedIO(tp):
756 def __del__(self):
757 record.append(1)
758 try:
759 f = super().__del__
760 except AttributeError:
761 pass
762 else:
763 f()
764 def close(self):
765 record.append(2)
766 super().close()
767 def flush(self):
768 record.append(3)
769 super().flush()
770 rawio = self.MockRawIO()
771 bufio = MyBufferedIO(rawio)
772 writable = bufio.writable()
773 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000774 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000775 if writable:
776 self.assertEqual(record, [1, 2, 3])
777 else:
778 self.assertEqual(record, [1, 2])
779
780 def test_context_manager(self):
781 # Test usability as a context manager
782 rawio = self.MockRawIO()
783 bufio = self.tp(rawio)
784 def _with():
785 with bufio:
786 pass
787 _with()
788 # bufio should now be closed, and using it a second time should raise
789 # a ValueError.
790 self.assertRaises(ValueError, _with)
791
792 def test_error_through_destructor(self):
793 # Test that the exception state is not modified by a destructor,
794 # even if close() fails.
795 rawio = self.CloseFailureIO()
796 def f():
797 self.tp(rawio).xyzzy
798 with support.captured_output("stderr") as s:
799 self.assertRaises(AttributeError, f)
800 s = s.getvalue().strip()
801 if s:
802 # The destructor *may* have printed an unraisable error, check it
803 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200804 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000805 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000806
Antoine Pitrou716c4442009-05-23 19:04:03 +0000807 def test_repr(self):
808 raw = self.MockRawIO()
809 b = self.tp(raw)
Serhiy Storchaka521e5862014-07-22 15:00:37 +0300810 clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
Antoine Pitrou716c4442009-05-23 19:04:03 +0000811 self.assertEqual(repr(b), "<%s>" % clsname)
812 raw.name = "dummy"
813 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
814 raw.name = b"dummy"
815 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
816
Antoine Pitrou6be88762010-05-03 16:48:20 +0000817 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200818 # Test that buffered file is closed despite failed flush
819 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000820 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200821 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000822 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200823 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200824 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000825 raw.flush = bad_flush
826 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200827 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600828 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200829 self.assertTrue(raw.closed)
830 self.assertTrue(closed) # flush() called
831 self.assertFalse(closed[0]) # flush() called before file closed
832 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200833 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600834
835 def test_close_error_on_close(self):
836 raw = self.MockRawIO()
837 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200838 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600839 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200840 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600841 raw.close = bad_close
842 b = self.tp(raw)
843 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200844 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600845 b.close()
846 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300847 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600848 self.assertEqual(err.exception.__context__.args, ('flush',))
849 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000850
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300851 def test_nonnormalized_close_error_on_close(self):
852 # Issue #21677
853 raw = self.MockRawIO()
854 def bad_flush():
855 raise non_existing_flush
856 def bad_close():
857 raise non_existing_close
858 raw.close = bad_close
859 b = self.tp(raw)
860 b.flush = bad_flush
861 with self.assertRaises(NameError) as err: # exception not swallowed
862 b.close()
863 self.assertIn('non_existing_close', str(err.exception))
864 self.assertIsInstance(err.exception.__context__, NameError)
865 self.assertIn('non_existing_flush', str(err.exception.__context__))
866 self.assertFalse(b.closed)
867
Antoine Pitrou6be88762010-05-03 16:48:20 +0000868 def test_multi_close(self):
869 raw = self.MockRawIO()
870 b = self.tp(raw)
871 b.close()
872 b.close()
873 b.close()
874 self.assertRaises(ValueError, b.flush)
875
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000876 def test_unseekable(self):
877 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
878 self.assertRaises(self.UnsupportedOperation, bufio.tell)
879 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
880
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000881 def test_readonly_attributes(self):
882 raw = self.MockRawIO()
883 buf = self.tp(raw)
884 x = self.MockRawIO()
885 with self.assertRaises(AttributeError):
886 buf.raw = x
887
Guido van Rossum78892e42007-04-06 17:31:18 +0000888
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200889class SizeofTest:
890
891 @support.cpython_only
892 def test_sizeof(self):
893 bufsize1 = 4096
894 bufsize2 = 8192
895 rawio = self.MockRawIO()
896 bufio = self.tp(rawio, buffer_size=bufsize1)
897 size = sys.getsizeof(bufio) - bufsize1
898 rawio = self.MockRawIO()
899 bufio = self.tp(rawio, buffer_size=bufsize2)
900 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
901
Jesus Ceadc469452012-10-04 12:37:56 +0200902 @support.cpython_only
903 def test_buffer_freeing(self) :
904 bufsize = 4096
905 rawio = self.MockRawIO()
906 bufio = self.tp(rawio, buffer_size=bufsize)
907 size = sys.getsizeof(bufio) - bufsize
908 bufio.close()
909 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
912 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000914 def test_constructor(self):
915 rawio = self.MockRawIO([b"abc"])
916 bufio = self.tp(rawio)
917 bufio.__init__(rawio)
918 bufio.__init__(rawio, buffer_size=1024)
919 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000920 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
922 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
923 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
924 rawio = self.MockRawIO([b"abc"])
925 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000926 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000927
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200928 def test_uninitialized(self):
929 bufio = self.tp.__new__(self.tp)
930 del bufio
931 bufio = self.tp.__new__(self.tp)
932 self.assertRaisesRegex((ValueError, AttributeError),
933 'uninitialized|has no attribute',
934 bufio.read, 0)
935 bufio.__init__(self.MockRawIO())
936 self.assertEqual(bufio.read(0), b'')
937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000939 for arg in (None, 7):
940 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000942 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 # Invalid args
944 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000945
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946 def test_read1(self):
947 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
948 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000949 self.assertEqual(b"a", bufio.read(1))
950 self.assertEqual(b"b", bufio.read1(1))
951 self.assertEqual(rawio._reads, 1)
952 self.assertEqual(b"c", bufio.read1(100))
953 self.assertEqual(rawio._reads, 1)
954 self.assertEqual(b"d", bufio.read1(100))
955 self.assertEqual(rawio._reads, 2)
956 self.assertEqual(b"efg", bufio.read1(100))
957 self.assertEqual(rawio._reads, 3)
958 self.assertEqual(b"", bufio.read1(100))
959 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960 # Invalid args
961 self.assertRaises(ValueError, bufio.read1, -1)
962
963 def test_readinto(self):
964 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
965 bufio = self.tp(rawio)
966 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000967 self.assertEqual(bufio.readinto(b), 2)
968 self.assertEqual(b, b"ab")
969 self.assertEqual(bufio.readinto(b), 2)
970 self.assertEqual(b, b"cd")
971 self.assertEqual(bufio.readinto(b), 2)
972 self.assertEqual(b, b"ef")
973 self.assertEqual(bufio.readinto(b), 1)
974 self.assertEqual(b, b"gf")
975 self.assertEqual(bufio.readinto(b), 0)
976 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200977 rawio = self.MockRawIO((b"abc", None))
978 bufio = self.tp(rawio)
979 self.assertEqual(bufio.readinto(b), 2)
980 self.assertEqual(b, b"ab")
981 self.assertEqual(bufio.readinto(b), 1)
982 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983
Benjamin Petersona96fea02014-06-22 14:17:44 -0700984 def test_readinto1(self):
985 buffer_size = 10
986 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
987 bufio = self.tp(rawio, buffer_size=buffer_size)
988 b = bytearray(2)
989 self.assertEqual(bufio.peek(3), b'abc')
990 self.assertEqual(rawio._reads, 1)
991 self.assertEqual(bufio.readinto1(b), 2)
992 self.assertEqual(b, b"ab")
993 self.assertEqual(rawio._reads, 1)
994 self.assertEqual(bufio.readinto1(b), 1)
995 self.assertEqual(b[:1], b"c")
996 self.assertEqual(rawio._reads, 1)
997 self.assertEqual(bufio.readinto1(b), 2)
998 self.assertEqual(b, b"de")
999 self.assertEqual(rawio._reads, 2)
1000 b = bytearray(2*buffer_size)
1001 self.assertEqual(bufio.peek(3), b'fgh')
1002 self.assertEqual(rawio._reads, 3)
1003 self.assertEqual(bufio.readinto1(b), 6)
1004 self.assertEqual(b[:6], b"fghjkl")
1005 self.assertEqual(rawio._reads, 4)
1006
1007 def test_readinto_array(self):
1008 buffer_size = 60
1009 data = b"a" * 26
1010 rawio = self.MockRawIO((data,))
1011 bufio = self.tp(rawio, buffer_size=buffer_size)
1012
1013 # Create an array with element size > 1 byte
1014 b = array.array('i', b'x' * 32)
1015 assert len(b) != 16
1016
1017 # Read into it. We should get as many *bytes* as we can fit into b
1018 # (which is more than the number of elements)
1019 n = bufio.readinto(b)
1020 self.assertGreater(n, len(b))
1021
1022 # Check that old contents of b are preserved
1023 bm = memoryview(b).cast('B')
1024 self.assertLess(n, len(bm))
1025 self.assertEqual(bm[:n], data[:n])
1026 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1027
1028 def test_readinto1_array(self):
1029 buffer_size = 60
1030 data = b"a" * 26
1031 rawio = self.MockRawIO((data,))
1032 bufio = self.tp(rawio, buffer_size=buffer_size)
1033
1034 # Create an array with element size > 1 byte
1035 b = array.array('i', b'x' * 32)
1036 assert len(b) != 16
1037
1038 # Read into it. We should get as many *bytes* as we can fit into b
1039 # (which is more than the number of elements)
1040 n = bufio.readinto1(b)
1041 self.assertGreater(n, len(b))
1042
1043 # Check that old contents of b are preserved
1044 bm = memoryview(b).cast('B')
1045 self.assertLess(n, len(bm))
1046 self.assertEqual(bm[:n], data[:n])
1047 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1048
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001049 def test_readlines(self):
1050 def bufio():
1051 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1052 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001053 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1054 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1055 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001058 data = b"abcdefghi"
1059 dlen = len(data)
1060
1061 tests = [
1062 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1063 [ 100, [ 3, 3, 3], [ dlen ] ],
1064 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1065 ]
1066
1067 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 rawio = self.MockFileIO(data)
1069 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001070 pos = 0
1071 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001072 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001073 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001075 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001076
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001078 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1080 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001081 self.assertEqual(b"abcd", bufio.read(6))
1082 self.assertEqual(b"e", bufio.read(1))
1083 self.assertEqual(b"fg", bufio.read())
1084 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001085 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001086 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001087
Victor Stinnera80987f2011-05-25 22:47:16 +02001088 rawio = self.MockRawIO((b"a", None, None))
1089 self.assertEqual(b"a", rawio.readall())
1090 self.assertIsNone(rawio.readall())
1091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_read_past_eof(self):
1093 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1094 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001095
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 def test_read_all(self):
1099 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1100 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001101
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001103
Victor Stinner45df8202010-04-28 22:31:17 +00001104 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001105 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001107 try:
1108 # Write out many bytes with exactly the same number of 0's,
1109 # 1's... 255's. This will help us check that concurrent reading
1110 # doesn't duplicate or forget contents.
1111 N = 1000
1112 l = list(range(256)) * N
1113 random.shuffle(l)
1114 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001115 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001116 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001117 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001119 errors = []
1120 results = []
1121 def f():
1122 try:
1123 # Intra-buffer read then buffer-flushing read
1124 for n in cycle([1, 19]):
1125 s = bufio.read(n)
1126 if not s:
1127 break
1128 # list.append() is atomic
1129 results.append(s)
1130 except Exception as e:
1131 errors.append(e)
1132 raise
1133 threads = [threading.Thread(target=f) for x in range(20)]
1134 for t in threads:
1135 t.start()
1136 time.sleep(0.02) # yield
1137 for t in threads:
1138 t.join()
1139 self.assertFalse(errors,
1140 "the following exceptions were caught: %r" % errors)
1141 s = b''.join(results)
1142 for i in range(256):
1143 c = bytes(bytearray([i]))
1144 self.assertEqual(s.count(c), N)
1145 finally:
1146 support.unlink(support.TESTFN)
1147
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001148 def test_unseekable(self):
1149 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1150 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1151 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1152 bufio.read(1)
1153 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1154 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_misbehaved_io(self):
1157 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1158 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001159 self.assertRaises(OSError, bufio.seek, 0)
1160 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001162 def test_no_extraneous_read(self):
1163 # Issue #9550; when the raw IO object has satisfied the read request,
1164 # we should not issue any additional reads, otherwise it may block
1165 # (e.g. socket).
1166 bufsize = 16
1167 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1168 rawio = self.MockRawIO([b"x" * n])
1169 bufio = self.tp(rawio, bufsize)
1170 self.assertEqual(bufio.read(n), b"x" * n)
1171 # Simple case: one raw read is enough to satisfy the request.
1172 self.assertEqual(rawio._extraneous_reads, 0,
1173 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1174 # A more complex case where two raw reads are needed to satisfy
1175 # the request.
1176 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1177 bufio = self.tp(rawio, bufsize)
1178 self.assertEqual(bufio.read(n), b"x" * n)
1179 self.assertEqual(rawio._extraneous_reads, 0,
1180 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1181
1182
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001183class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 tp = io.BufferedReader
1185
1186 def test_constructor(self):
1187 BufferedReaderTest.test_constructor(self)
1188 # The allocation can succeed on 32-bit builds, e.g. with more
1189 # than 2GB RAM and a 64-bit kernel.
1190 if sys.maxsize > 0x7FFFFFFF:
1191 rawio = self.MockRawIO()
1192 bufio = self.tp(rawio)
1193 self.assertRaises((OverflowError, MemoryError, ValueError),
1194 bufio.__init__, rawio, sys.maxsize)
1195
1196 def test_initialization(self):
1197 rawio = self.MockRawIO([b"abc"])
1198 bufio = self.tp(rawio)
1199 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1200 self.assertRaises(ValueError, bufio.read)
1201 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1202 self.assertRaises(ValueError, bufio.read)
1203 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1204 self.assertRaises(ValueError, bufio.read)
1205
1206 def test_misbehaved_io_read(self):
1207 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1208 bufio = self.tp(rawio)
1209 # _pyio.BufferedReader seems to implement reading different, so that
1210 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001211 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001212
1213 def test_garbage_collection(self):
1214 # C BufferedReader objects are collected.
1215 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001216 with support.check_warnings(('', ResourceWarning)):
1217 rawio = self.FileIO(support.TESTFN, "w+b")
1218 f = self.tp(rawio)
1219 f.f = f
1220 wr = weakref.ref(f)
1221 del f
1222 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001223 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224
R David Murray67bfe802013-02-23 21:51:05 -05001225 def test_args_error(self):
1226 # Issue #17275
1227 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1228 self.tp(io.BytesIO(), 1024, 1024, 1024)
1229
1230
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001231class PyBufferedReaderTest(BufferedReaderTest):
1232 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001233
Guido van Rossuma9e20242007-03-08 00:43:48 +00001234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001235class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1236 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001238 def test_constructor(self):
1239 rawio = self.MockRawIO()
1240 bufio = self.tp(rawio)
1241 bufio.__init__(rawio)
1242 bufio.__init__(rawio, buffer_size=1024)
1243 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001244 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 bufio.flush()
1246 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1247 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1249 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001250 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001254 def test_uninitialized(self):
1255 bufio = self.tp.__new__(self.tp)
1256 del bufio
1257 bufio = self.tp.__new__(self.tp)
1258 self.assertRaisesRegex((ValueError, AttributeError),
1259 'uninitialized|has no attribute',
1260 bufio.write, b'')
1261 bufio.__init__(self.MockRawIO())
1262 self.assertEqual(bufio.write(b''), 0)
1263
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001264 def test_detach_flush(self):
1265 raw = self.MockRawIO()
1266 buf = self.tp(raw)
1267 buf.write(b"howdy!")
1268 self.assertFalse(raw._write_stack)
1269 buf.detach()
1270 self.assertEqual(raw._write_stack, [b"howdy!"])
1271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001273 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 writer = self.MockRawIO()
1275 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001276 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001277 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 def test_write_overflow(self):
1280 writer = self.MockRawIO()
1281 bufio = self.tp(writer, 8)
1282 contents = b"abcdefghijklmnop"
1283 for n in range(0, len(contents), 3):
1284 bufio.write(contents[n:n+3])
1285 flushed = b"".join(writer._write_stack)
1286 # At least (total - 8) bytes were implicitly flushed, perhaps more
1287 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001288 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001289
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 def check_writes(self, intermediate_func):
1291 # Lots of writes, test the flushed output is as expected.
1292 contents = bytes(range(256)) * 1000
1293 n = 0
1294 writer = self.MockRawIO()
1295 bufio = self.tp(writer, 13)
1296 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1297 def gen_sizes():
1298 for size in count(1):
1299 for i in range(15):
1300 yield size
1301 sizes = gen_sizes()
1302 while n < len(contents):
1303 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001304 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 intermediate_func(bufio)
1306 n += size
1307 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001308 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001309
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001310 def test_writes(self):
1311 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 def test_writes_and_flushes(self):
1314 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001315
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001316 def test_writes_and_seeks(self):
1317 def _seekabs(bufio):
1318 pos = bufio.tell()
1319 bufio.seek(pos + 1, 0)
1320 bufio.seek(pos - 1, 0)
1321 bufio.seek(pos, 0)
1322 self.check_writes(_seekabs)
1323 def _seekrel(bufio):
1324 pos = bufio.seek(0, 1)
1325 bufio.seek(+1, 1)
1326 bufio.seek(-1, 1)
1327 bufio.seek(pos, 0)
1328 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001329
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001330 def test_writes_and_truncates(self):
1331 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001333 def test_write_non_blocking(self):
1334 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001335 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001336
Ezio Melottib3aedd42010-11-20 19:04:17 +00001337 self.assertEqual(bufio.write(b"abcd"), 4)
1338 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001339 # 1 byte will be written, the rest will be buffered
1340 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001341 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001342
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1344 raw.block_on(b"0")
1345 try:
1346 bufio.write(b"opqrwxyz0123456789")
1347 except self.BlockingIOError as e:
1348 written = e.characters_written
1349 else:
1350 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001351 self.assertEqual(written, 16)
1352 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001353 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001354
Ezio Melottib3aedd42010-11-20 19:04:17 +00001355 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001356 s = raw.pop_written()
1357 # Previously buffered bytes were flushed
1358 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 def test_write_and_rewind(self):
1361 raw = io.BytesIO()
1362 bufio = self.tp(raw, 4)
1363 self.assertEqual(bufio.write(b"abcdef"), 6)
1364 self.assertEqual(bufio.tell(), 6)
1365 bufio.seek(0, 0)
1366 self.assertEqual(bufio.write(b"XY"), 2)
1367 bufio.seek(6, 0)
1368 self.assertEqual(raw.getvalue(), b"XYcdef")
1369 self.assertEqual(bufio.write(b"123456"), 6)
1370 bufio.flush()
1371 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373 def test_flush(self):
1374 writer = self.MockRawIO()
1375 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001376 bufio.write(b"abc")
1377 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001378 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001379
Antoine Pitrou131a4892012-10-16 22:57:11 +02001380 def test_writelines(self):
1381 l = [b'ab', b'cd', b'ef']
1382 writer = self.MockRawIO()
1383 bufio = self.tp(writer, 8)
1384 bufio.writelines(l)
1385 bufio.flush()
1386 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1387
1388 def test_writelines_userlist(self):
1389 l = UserList([b'ab', b'cd', b'ef'])
1390 writer = self.MockRawIO()
1391 bufio = self.tp(writer, 8)
1392 bufio.writelines(l)
1393 bufio.flush()
1394 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1395
1396 def test_writelines_error(self):
1397 writer = self.MockRawIO()
1398 bufio = self.tp(writer, 8)
1399 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1400 self.assertRaises(TypeError, bufio.writelines, None)
1401 self.assertRaises(TypeError, bufio.writelines, 'abc')
1402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 def test_destructor(self):
1404 writer = self.MockRawIO()
1405 bufio = self.tp(writer, 8)
1406 bufio.write(b"abc")
1407 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001408 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001409 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410
1411 def test_truncate(self):
1412 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001413 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 bufio = self.tp(raw, 8)
1415 bufio.write(b"abcdef")
1416 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001417 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001418 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 self.assertEqual(f.read(), b"abc")
1420
Victor Stinner45df8202010-04-28 22:31:17 +00001421 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001422 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001424 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 # Write out many bytes from many threads and test they were
1426 # all flushed.
1427 N = 1000
1428 contents = bytes(range(256)) * N
1429 sizes = cycle([1, 19])
1430 n = 0
1431 queue = deque()
1432 while n < len(contents):
1433 size = next(sizes)
1434 queue.append(contents[n:n+size])
1435 n += size
1436 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001437 # We use a real file object because it allows us to
1438 # exercise situations where the GIL is released before
1439 # writing the buffer to the raw streams. This is in addition
1440 # to concurrency issues due to switching threads in the middle
1441 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001442 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001443 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001444 errors = []
1445 def f():
1446 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 while True:
1448 try:
1449 s = queue.popleft()
1450 except IndexError:
1451 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001452 bufio.write(s)
1453 except Exception as e:
1454 errors.append(e)
1455 raise
1456 threads = [threading.Thread(target=f) for x in range(20)]
1457 for t in threads:
1458 t.start()
1459 time.sleep(0.02) # yield
1460 for t in threads:
1461 t.join()
1462 self.assertFalse(errors,
1463 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001464 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001465 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 s = f.read()
1467 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001468 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001469 finally:
1470 support.unlink(support.TESTFN)
1471
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 def test_misbehaved_io(self):
1473 rawio = self.MisbehavedRawIO()
1474 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001475 self.assertRaises(OSError, bufio.seek, 0)
1476 self.assertRaises(OSError, bufio.tell)
1477 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001478
Florent Xicluna109d5732012-07-07 17:03:22 +02001479 def test_max_buffer_size_removal(self):
1480 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001481 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001482
Benjamin Peterson68623612012-12-20 11:53:11 -06001483 def test_write_error_on_close(self):
1484 raw = self.MockRawIO()
1485 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001486 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001487 raw.write = bad_write
1488 b = self.tp(raw)
1489 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001490 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001491 self.assertTrue(b.closed)
1492
Benjamin Peterson59406a92009-03-26 17:10:29 +00001493
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001494class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495 tp = io.BufferedWriter
1496
1497 def test_constructor(self):
1498 BufferedWriterTest.test_constructor(self)
1499 # The allocation can succeed on 32-bit builds, e.g. with more
1500 # than 2GB RAM and a 64-bit kernel.
1501 if sys.maxsize > 0x7FFFFFFF:
1502 rawio = self.MockRawIO()
1503 bufio = self.tp(rawio)
1504 self.assertRaises((OverflowError, MemoryError, ValueError),
1505 bufio.__init__, rawio, sys.maxsize)
1506
1507 def test_initialization(self):
1508 rawio = self.MockRawIO()
1509 bufio = self.tp(rawio)
1510 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1511 self.assertRaises(ValueError, bufio.write, b"def")
1512 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1513 self.assertRaises(ValueError, bufio.write, b"def")
1514 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1515 self.assertRaises(ValueError, bufio.write, b"def")
1516
1517 def test_garbage_collection(self):
1518 # C BufferedWriter objects are collected, and collecting them flushes
1519 # all data to disk.
1520 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001521 with support.check_warnings(('', ResourceWarning)):
1522 rawio = self.FileIO(support.TESTFN, "w+b")
1523 f = self.tp(rawio)
1524 f.write(b"123xxx")
1525 f.x = f
1526 wr = weakref.ref(f)
1527 del f
1528 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001529 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001530 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531 self.assertEqual(f.read(), b"123xxx")
1532
R David Murray67bfe802013-02-23 21:51:05 -05001533 def test_args_error(self):
1534 # Issue #17275
1535 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1536 self.tp(io.BytesIO(), 1024, 1024, 1024)
1537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001538
1539class PyBufferedWriterTest(BufferedWriterTest):
1540 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001541
Guido van Rossum01a27522007-03-07 01:00:12 +00001542class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001543
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001544 def test_constructor(self):
1545 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001546 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001547
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001548 def test_uninitialized(self):
1549 pair = self.tp.__new__(self.tp)
1550 del pair
1551 pair = self.tp.__new__(self.tp)
1552 self.assertRaisesRegex((ValueError, AttributeError),
1553 'uninitialized|has no attribute',
1554 pair.read, 0)
1555 self.assertRaisesRegex((ValueError, AttributeError),
1556 'uninitialized|has no attribute',
1557 pair.write, b'')
1558 pair.__init__(self.MockRawIO(), self.MockRawIO())
1559 self.assertEqual(pair.read(0), b'')
1560 self.assertEqual(pair.write(b''), 0)
1561
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001562 def test_detach(self):
1563 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1564 self.assertRaises(self.UnsupportedOperation, pair.detach)
1565
Florent Xicluna109d5732012-07-07 17:03:22 +02001566 def test_constructor_max_buffer_size_removal(self):
1567 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001568 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001569
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001570 def test_constructor_with_not_readable(self):
1571 class NotReadable(MockRawIO):
1572 def readable(self):
1573 return False
1574
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001575 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001576
1577 def test_constructor_with_not_writeable(self):
1578 class NotWriteable(MockRawIO):
1579 def writable(self):
1580 return False
1581
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001582 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001583
1584 def test_read(self):
1585 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1586
1587 self.assertEqual(pair.read(3), b"abc")
1588 self.assertEqual(pair.read(1), b"d")
1589 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001590 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1591 self.assertEqual(pair.read(None), b"abc")
1592
1593 def test_readlines(self):
1594 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1595 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1596 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1597 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001598
1599 def test_read1(self):
1600 # .read1() is delegated to the underlying reader object, so this test
1601 # can be shallow.
1602 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1603
1604 self.assertEqual(pair.read1(3), b"abc")
1605
1606 def test_readinto(self):
1607 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1608
1609 data = bytearray(5)
1610 self.assertEqual(pair.readinto(data), 5)
1611 self.assertEqual(data, b"abcde")
1612
1613 def test_write(self):
1614 w = self.MockRawIO()
1615 pair = self.tp(self.MockRawIO(), w)
1616
1617 pair.write(b"abc")
1618 pair.flush()
1619 pair.write(b"def")
1620 pair.flush()
1621 self.assertEqual(w._write_stack, [b"abc", b"def"])
1622
1623 def test_peek(self):
1624 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1625
1626 self.assertTrue(pair.peek(3).startswith(b"abc"))
1627 self.assertEqual(pair.read(3), b"abc")
1628
1629 def test_readable(self):
1630 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1631 self.assertTrue(pair.readable())
1632
1633 def test_writeable(self):
1634 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1635 self.assertTrue(pair.writable())
1636
1637 def test_seekable(self):
1638 # BufferedRWPairs are never seekable, even if their readers and writers
1639 # are.
1640 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1641 self.assertFalse(pair.seekable())
1642
1643 # .flush() is delegated to the underlying writer object and has been
1644 # tested in the test_write method.
1645
1646 def test_close_and_closed(self):
1647 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1648 self.assertFalse(pair.closed)
1649 pair.close()
1650 self.assertTrue(pair.closed)
1651
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001652 def test_reader_close_error_on_close(self):
1653 def reader_close():
1654 reader_non_existing
1655 reader = self.MockRawIO()
1656 reader.close = reader_close
1657 writer = self.MockRawIO()
1658 pair = self.tp(reader, writer)
1659 with self.assertRaises(NameError) as err:
1660 pair.close()
1661 self.assertIn('reader_non_existing', str(err.exception))
1662 self.assertTrue(pair.closed)
1663 self.assertFalse(reader.closed)
1664 self.assertTrue(writer.closed)
1665
1666 def test_writer_close_error_on_close(self):
1667 def writer_close():
1668 writer_non_existing
1669 reader = self.MockRawIO()
1670 writer = self.MockRawIO()
1671 writer.close = writer_close
1672 pair = self.tp(reader, writer)
1673 with self.assertRaises(NameError) as err:
1674 pair.close()
1675 self.assertIn('writer_non_existing', str(err.exception))
1676 self.assertFalse(pair.closed)
1677 self.assertTrue(reader.closed)
1678 self.assertFalse(writer.closed)
1679
1680 def test_reader_writer_close_error_on_close(self):
1681 def reader_close():
1682 reader_non_existing
1683 def writer_close():
1684 writer_non_existing
1685 reader = self.MockRawIO()
1686 reader.close = reader_close
1687 writer = self.MockRawIO()
1688 writer.close = writer_close
1689 pair = self.tp(reader, writer)
1690 with self.assertRaises(NameError) as err:
1691 pair.close()
1692 self.assertIn('reader_non_existing', str(err.exception))
1693 self.assertIsInstance(err.exception.__context__, NameError)
1694 self.assertIn('writer_non_existing', str(err.exception.__context__))
1695 self.assertFalse(pair.closed)
1696 self.assertFalse(reader.closed)
1697 self.assertFalse(writer.closed)
1698
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001699 def test_isatty(self):
1700 class SelectableIsAtty(MockRawIO):
1701 def __init__(self, isatty):
1702 MockRawIO.__init__(self)
1703 self._isatty = isatty
1704
1705 def isatty(self):
1706 return self._isatty
1707
1708 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1709 self.assertFalse(pair.isatty())
1710
1711 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1712 self.assertTrue(pair.isatty())
1713
1714 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1715 self.assertTrue(pair.isatty())
1716
1717 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1718 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001719
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001720 def test_weakref_clearing(self):
1721 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1722 ref = weakref.ref(brw)
1723 brw = None
1724 ref = None # Shouldn't segfault.
1725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726class CBufferedRWPairTest(BufferedRWPairTest):
1727 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729class PyBufferedRWPairTest(BufferedRWPairTest):
1730 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732
1733class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1734 read_mode = "rb+"
1735 write_mode = "wb+"
1736
1737 def test_constructor(self):
1738 BufferedReaderTest.test_constructor(self)
1739 BufferedWriterTest.test_constructor(self)
1740
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001741 def test_uninitialized(self):
1742 BufferedReaderTest.test_uninitialized(self)
1743 BufferedWriterTest.test_uninitialized(self)
1744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001745 def test_read_and_write(self):
1746 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001747 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001748
1749 self.assertEqual(b"as", rw.read(2))
1750 rw.write(b"ddd")
1751 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001752 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001754 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001756 def test_seek_and_tell(self):
1757 raw = self.BytesIO(b"asdfghjkl")
1758 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001759
Ezio Melottib3aedd42010-11-20 19:04:17 +00001760 self.assertEqual(b"as", rw.read(2))
1761 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001762 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001763 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001764
Antoine Pitroue05565e2011-08-20 14:39:23 +02001765 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001766 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001767 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001768 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001769 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001770 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001771 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001772 self.assertEqual(7, rw.tell())
1773 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001774 rw.flush()
1775 self.assertEqual(b"asdf123fl", raw.getvalue())
1776
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001777 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001778
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779 def check_flush_and_read(self, read_func):
1780 raw = self.BytesIO(b"abcdefghi")
1781 bufio = self.tp(raw)
1782
Ezio Melottib3aedd42010-11-20 19:04:17 +00001783 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001785 self.assertEqual(b"ef", read_func(bufio, 2))
1786 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001788 self.assertEqual(6, bufio.tell())
1789 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 raw.seek(0, 0)
1791 raw.write(b"XYZ")
1792 # flush() resets the read buffer
1793 bufio.flush()
1794 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001795 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796
1797 def test_flush_and_read(self):
1798 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1799
1800 def test_flush_and_readinto(self):
1801 def _readinto(bufio, n=-1):
1802 b = bytearray(n if n >= 0 else 9999)
1803 n = bufio.readinto(b)
1804 return bytes(b[:n])
1805 self.check_flush_and_read(_readinto)
1806
1807 def test_flush_and_peek(self):
1808 def _peek(bufio, n=-1):
1809 # This relies on the fact that the buffer can contain the whole
1810 # raw stream, otherwise peek() can return less.
1811 b = bufio.peek(n)
1812 if n != -1:
1813 b = b[:n]
1814 bufio.seek(len(b), 1)
1815 return b
1816 self.check_flush_and_read(_peek)
1817
1818 def test_flush_and_write(self):
1819 raw = self.BytesIO(b"abcdefghi")
1820 bufio = self.tp(raw)
1821
1822 bufio.write(b"123")
1823 bufio.flush()
1824 bufio.write(b"45")
1825 bufio.flush()
1826 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001827 self.assertEqual(b"12345fghi", raw.getvalue())
1828 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829
1830 def test_threads(self):
1831 BufferedReaderTest.test_threads(self)
1832 BufferedWriterTest.test_threads(self)
1833
1834 def test_writes_and_peek(self):
1835 def _peek(bufio):
1836 bufio.peek(1)
1837 self.check_writes(_peek)
1838 def _peek(bufio):
1839 pos = bufio.tell()
1840 bufio.seek(-1, 1)
1841 bufio.peek(1)
1842 bufio.seek(pos, 0)
1843 self.check_writes(_peek)
1844
1845 def test_writes_and_reads(self):
1846 def _read(bufio):
1847 bufio.seek(-1, 1)
1848 bufio.read(1)
1849 self.check_writes(_read)
1850
1851 def test_writes_and_read1s(self):
1852 def _read1(bufio):
1853 bufio.seek(-1, 1)
1854 bufio.read1(1)
1855 self.check_writes(_read1)
1856
1857 def test_writes_and_readintos(self):
1858 def _read(bufio):
1859 bufio.seek(-1, 1)
1860 bufio.readinto(bytearray(1))
1861 self.check_writes(_read)
1862
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001863 def test_write_after_readahead(self):
1864 # Issue #6629: writing after the buffer was filled by readahead should
1865 # first rewind the raw stream.
1866 for overwrite_size in [1, 5]:
1867 raw = self.BytesIO(b"A" * 10)
1868 bufio = self.tp(raw, 4)
1869 # Trigger readahead
1870 self.assertEqual(bufio.read(1), b"A")
1871 self.assertEqual(bufio.tell(), 1)
1872 # Overwriting should rewind the raw stream if it needs so
1873 bufio.write(b"B" * overwrite_size)
1874 self.assertEqual(bufio.tell(), overwrite_size + 1)
1875 # If the write size was smaller than the buffer size, flush() and
1876 # check that rewind happens.
1877 bufio.flush()
1878 self.assertEqual(bufio.tell(), overwrite_size + 1)
1879 s = raw.getvalue()
1880 self.assertEqual(s,
1881 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1882
Antoine Pitrou7c404892011-05-13 00:13:33 +02001883 def test_write_rewind_write(self):
1884 # Various combinations of reading / writing / seeking backwards / writing again
1885 def mutate(bufio, pos1, pos2):
1886 assert pos2 >= pos1
1887 # Fill the buffer
1888 bufio.seek(pos1)
1889 bufio.read(pos2 - pos1)
1890 bufio.write(b'\x02')
1891 # This writes earlier than the previous write, but still inside
1892 # the buffer.
1893 bufio.seek(pos1)
1894 bufio.write(b'\x01')
1895
1896 b = b"\x80\x81\x82\x83\x84"
1897 for i in range(0, len(b)):
1898 for j in range(i, len(b)):
1899 raw = self.BytesIO(b)
1900 bufio = self.tp(raw, 100)
1901 mutate(bufio, i, j)
1902 bufio.flush()
1903 expected = bytearray(b)
1904 expected[j] = 2
1905 expected[i] = 1
1906 self.assertEqual(raw.getvalue(), expected,
1907 "failed result for i=%d, j=%d" % (i, j))
1908
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001909 def test_truncate_after_read_or_write(self):
1910 raw = self.BytesIO(b"A" * 10)
1911 bufio = self.tp(raw, 100)
1912 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1913 self.assertEqual(bufio.truncate(), 2)
1914 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1915 self.assertEqual(bufio.truncate(), 4)
1916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 def test_misbehaved_io(self):
1918 BufferedReaderTest.test_misbehaved_io(self)
1919 BufferedWriterTest.test_misbehaved_io(self)
1920
Antoine Pitroue05565e2011-08-20 14:39:23 +02001921 def test_interleaved_read_write(self):
1922 # Test for issue #12213
1923 with self.BytesIO(b'abcdefgh') as raw:
1924 with self.tp(raw, 100) as f:
1925 f.write(b"1")
1926 self.assertEqual(f.read(1), b'b')
1927 f.write(b'2')
1928 self.assertEqual(f.read1(1), b'd')
1929 f.write(b'3')
1930 buf = bytearray(1)
1931 f.readinto(buf)
1932 self.assertEqual(buf, b'f')
1933 f.write(b'4')
1934 self.assertEqual(f.peek(1), b'h')
1935 f.flush()
1936 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1937
1938 with self.BytesIO(b'abc') as raw:
1939 with self.tp(raw, 100) as f:
1940 self.assertEqual(f.read(1), b'a')
1941 f.write(b"2")
1942 self.assertEqual(f.read(1), b'c')
1943 f.flush()
1944 self.assertEqual(raw.getvalue(), b'a2c')
1945
1946 def test_interleaved_readline_write(self):
1947 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1948 with self.tp(raw) as f:
1949 f.write(b'1')
1950 self.assertEqual(f.readline(), b'b\n')
1951 f.write(b'2')
1952 self.assertEqual(f.readline(), b'def\n')
1953 f.write(b'3')
1954 self.assertEqual(f.readline(), b'\n')
1955 f.flush()
1956 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1957
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001958 # You can't construct a BufferedRandom over a non-seekable stream.
1959 test_unseekable = None
1960
R David Murray67bfe802013-02-23 21:51:05 -05001961
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001962class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 tp = io.BufferedRandom
1964
1965 def test_constructor(self):
1966 BufferedRandomTest.test_constructor(self)
1967 # The allocation can succeed on 32-bit builds, e.g. with more
1968 # than 2GB RAM and a 64-bit kernel.
1969 if sys.maxsize > 0x7FFFFFFF:
1970 rawio = self.MockRawIO()
1971 bufio = self.tp(rawio)
1972 self.assertRaises((OverflowError, MemoryError, ValueError),
1973 bufio.__init__, rawio, sys.maxsize)
1974
1975 def test_garbage_collection(self):
1976 CBufferedReaderTest.test_garbage_collection(self)
1977 CBufferedWriterTest.test_garbage_collection(self)
1978
R David Murray67bfe802013-02-23 21:51:05 -05001979 def test_args_error(self):
1980 # Issue #17275
1981 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1982 self.tp(io.BytesIO(), 1024, 1024, 1024)
1983
1984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985class PyBufferedRandomTest(BufferedRandomTest):
1986 tp = pyio.BufferedRandom
1987
1988
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001989# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1990# properties:
1991# - A single output character can correspond to many bytes of input.
1992# - The number of input bytes to complete the character can be
1993# undetermined until the last input byte is received.
1994# - The number of input bytes can vary depending on previous input.
1995# - A single input byte can correspond to many characters of output.
1996# - The number of output characters can be undetermined until the
1997# last input byte is received.
1998# - The number of output characters can vary depending on previous input.
1999
2000class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2001 """
2002 For testing seek/tell behavior with a stateful, buffering decoder.
2003
2004 Input is a sequence of words. Words may be fixed-length (length set
2005 by input) or variable-length (period-terminated). In variable-length
2006 mode, extra periods are ignored. Possible words are:
2007 - 'i' followed by a number sets the input length, I (maximum 99).
2008 When I is set to 0, words are space-terminated.
2009 - 'o' followed by a number sets the output length, O (maximum 99).
2010 - Any other word is converted into a word followed by a period on
2011 the output. The output word consists of the input word truncated
2012 or padded out with hyphens to make its length equal to O. If O
2013 is 0, the word is output verbatim without truncating or padding.
2014 I and O are initially set to 1. When I changes, any buffered input is
2015 re-scanned according to the new I. EOF also terminates the last word.
2016 """
2017
2018 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002019 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002020 self.reset()
2021
2022 def __repr__(self):
2023 return '<SID %x>' % id(self)
2024
2025 def reset(self):
2026 self.i = 1
2027 self.o = 1
2028 self.buffer = bytearray()
2029
2030 def getstate(self):
2031 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2032 return bytes(self.buffer), i*100 + o
2033
2034 def setstate(self, state):
2035 buffer, io = state
2036 self.buffer = bytearray(buffer)
2037 i, o = divmod(io, 100)
2038 self.i, self.o = i ^ 1, o ^ 1
2039
2040 def decode(self, input, final=False):
2041 output = ''
2042 for b in input:
2043 if self.i == 0: # variable-length, terminated with period
2044 if b == ord('.'):
2045 if self.buffer:
2046 output += self.process_word()
2047 else:
2048 self.buffer.append(b)
2049 else: # fixed-length, terminate after self.i bytes
2050 self.buffer.append(b)
2051 if len(self.buffer) == self.i:
2052 output += self.process_word()
2053 if final and self.buffer: # EOF terminates the last word
2054 output += self.process_word()
2055 return output
2056
2057 def process_word(self):
2058 output = ''
2059 if self.buffer[0] == ord('i'):
2060 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2061 elif self.buffer[0] == ord('o'):
2062 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2063 else:
2064 output = self.buffer.decode('ascii')
2065 if len(output) < self.o:
2066 output += '-'*self.o # pad out with hyphens
2067 if self.o:
2068 output = output[:self.o] # truncate to output length
2069 output += '.'
2070 self.buffer = bytearray()
2071 return output
2072
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002073 codecEnabled = False
2074
2075 @classmethod
2076 def lookupTestDecoder(cls, name):
2077 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002078 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002079 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002080 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002081 incrementalencoder=None,
2082 streamreader=None, streamwriter=None,
2083 incrementaldecoder=cls)
2084
2085# Register the previous decoder for testing.
2086# Disabled by default, tests will enable it.
2087codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2088
2089
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002090class StatefulIncrementalDecoderTest(unittest.TestCase):
2091 """
2092 Make sure the StatefulIncrementalDecoder actually works.
2093 """
2094
2095 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002096 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002097 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002098 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002099 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002100 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002101 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002102 # I=0, O=6 (variable-length input, fixed-length output)
2103 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2104 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002105 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002106 # I=6, O=3 (fixed-length input > fixed-length output)
2107 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2108 # I=0, then 3; O=29, then 15 (with longer output)
2109 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2110 'a----------------------------.' +
2111 'b----------------------------.' +
2112 'cde--------------------------.' +
2113 'abcdefghijabcde.' +
2114 'a.b------------.' +
2115 '.c.------------.' +
2116 'd.e------------.' +
2117 'k--------------.' +
2118 'l--------------.' +
2119 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002120 ]
2121
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002123 # Try a few one-shot test cases.
2124 for input, eof, output in self.test_cases:
2125 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002126 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002127
2128 # Also test an unfinished decode, followed by forcing EOF.
2129 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002130 self.assertEqual(d.decode(b'oiabcd'), '')
2131 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002132
2133class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002134
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002135 def setUp(self):
2136 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2137 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002138 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002139
Guido van Rossumd0712812007-04-11 16:32:43 +00002140 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002141 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 def test_constructor(self):
2144 r = self.BytesIO(b"\xc3\xa9\n\n")
2145 b = self.BufferedReader(r, 1000)
2146 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002147 t.__init__(b, encoding="latin-1", newline="\r\n")
2148 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002149 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002150 t.__init__(b, encoding="utf-8", line_buffering=True)
2151 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(t.line_buffering, True)
2153 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 self.assertRaises(TypeError, t.__init__, b, newline=42)
2155 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2156
Nick Coghlana9b15242014-02-04 22:11:18 +10002157 def test_non_text_encoding_codecs_are_rejected(self):
2158 # Ensure the constructor complains if passed a codec that isn't
2159 # marked as a text encoding
2160 # http://bugs.python.org/issue20404
2161 r = self.BytesIO()
2162 b = self.BufferedWriter(r)
2163 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2164 self.TextIOWrapper(b, encoding="hex")
2165
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002166 def test_detach(self):
2167 r = self.BytesIO()
2168 b = self.BufferedWriter(r)
2169 t = self.TextIOWrapper(b)
2170 self.assertIs(t.detach(), b)
2171
2172 t = self.TextIOWrapper(b, encoding="ascii")
2173 t.write("howdy")
2174 self.assertFalse(r.getvalue())
2175 t.detach()
2176 self.assertEqual(r.getvalue(), b"howdy")
2177 self.assertRaises(ValueError, t.detach)
2178
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002179 # Operations independent of the detached stream should still work
2180 repr(t)
2181 self.assertEqual(t.encoding, "ascii")
2182 self.assertEqual(t.errors, "strict")
2183 self.assertFalse(t.line_buffering)
2184
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002185 def test_repr(self):
2186 raw = self.BytesIO("hello".encode("utf-8"))
2187 b = self.BufferedReader(raw)
2188 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002189 modname = self.TextIOWrapper.__module__
2190 self.assertEqual(repr(t),
2191 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2192 raw.name = "dummy"
2193 self.assertEqual(repr(t),
2194 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002195 t.mode = "r"
2196 self.assertEqual(repr(t),
2197 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002198 raw.name = b"dummy"
2199 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002200 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002201
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002202 t.buffer.detach()
2203 repr(t) # Should not raise an exception
2204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 def test_line_buffering(self):
2206 r = self.BytesIO()
2207 b = self.BufferedWriter(r, 1000)
2208 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002209 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002210 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002211 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002212 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002213 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002214 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002215
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002216 def test_default_encoding(self):
2217 old_environ = dict(os.environ)
2218 try:
2219 # try to get a user preferred encoding different than the current
2220 # locale encoding to check that TextIOWrapper() uses the current
2221 # locale encoding and not the user preferred encoding
2222 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2223 if key in os.environ:
2224 del os.environ[key]
2225
2226 current_locale_encoding = locale.getpreferredencoding(False)
2227 b = self.BytesIO()
2228 t = self.TextIOWrapper(b)
2229 self.assertEqual(t.encoding, current_locale_encoding)
2230 finally:
2231 os.environ.clear()
2232 os.environ.update(old_environ)
2233
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002234 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002235 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002236 # Issue 15989
2237 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002238 b = self.BytesIO()
2239 b.fileno = lambda: _testcapi.INT_MAX + 1
2240 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2241 b.fileno = lambda: _testcapi.UINT_MAX + 1
2242 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 def test_encoding(self):
2245 # Check the encoding attribute is always set, and valid
2246 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002247 t = self.TextIOWrapper(b, encoding="utf-8")
2248 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002249 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002250 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 codecs.lookup(t.encoding)
2252
2253 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002254 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 b = self.BytesIO(b"abc\n\xff\n")
2256 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002257 self.assertRaises(UnicodeError, t.read)
2258 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002259 b = self.BytesIO(b"abc\n\xff\n")
2260 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002261 self.assertRaises(UnicodeError, t.read)
2262 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 b = self.BytesIO(b"abc\n\xff\n")
2264 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002266 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002267 b = self.BytesIO(b"abc\n\xff\n")
2268 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002269 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002272 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 b = self.BytesIO()
2274 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002275 self.assertRaises(UnicodeError, t.write, "\xff")
2276 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 b = self.BytesIO()
2278 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002279 self.assertRaises(UnicodeError, t.write, "\xff")
2280 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 b = self.BytesIO()
2282 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002283 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002284 t.write("abc\xffdef\n")
2285 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002287 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 b = self.BytesIO()
2289 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002290 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002291 t.write("abc\xffdef\n")
2292 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002293 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002294
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002296 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2297
2298 tests = [
2299 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002300 [ '', input_lines ],
2301 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2302 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2303 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002304 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002305 encodings = (
2306 'utf-8', 'latin-1',
2307 'utf-16', 'utf-16-le', 'utf-16-be',
2308 'utf-32', 'utf-32-le', 'utf-32-be',
2309 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002310
Guido van Rossum8358db22007-08-18 21:39:55 +00002311 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002312 # character in TextIOWrapper._pending_line.
2313 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002314 # XXX: str.encode() should return bytes
2315 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002316 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002317 for bufsize in range(1, 10):
2318 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2320 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002321 encoding=encoding)
2322 if do_reads:
2323 got_lines = []
2324 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002325 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002326 if c2 == '':
2327 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002328 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002329 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002330 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002331 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002332
2333 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(got_line, exp_line)
2335 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337 def test_newlines_input(self):
2338 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002339 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2340 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002341 (None, normalized.decode("ascii").splitlines(keepends=True)),
2342 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002343 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2344 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2345 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002346 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 buf = self.BytesIO(testdata)
2348 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002349 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002350 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002351 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002352
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002353 def test_newlines_output(self):
2354 testdict = {
2355 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2356 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2357 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2358 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2359 }
2360 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2361 for newline, expected in tests:
2362 buf = self.BytesIO()
2363 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2364 txt.write("AAA\nB")
2365 txt.write("BB\nCCC\n")
2366 txt.write("X\rY\r\nZ")
2367 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002368 self.assertEqual(buf.closed, False)
2369 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370
2371 def test_destructor(self):
2372 l = []
2373 base = self.BytesIO
2374 class MyBytesIO(base):
2375 def close(self):
2376 l.append(self.getvalue())
2377 base.close(self)
2378 b = MyBytesIO()
2379 t = self.TextIOWrapper(b, encoding="ascii")
2380 t.write("abc")
2381 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002382 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002383 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384
2385 def test_override_destructor(self):
2386 record = []
2387 class MyTextIO(self.TextIOWrapper):
2388 def __del__(self):
2389 record.append(1)
2390 try:
2391 f = super().__del__
2392 except AttributeError:
2393 pass
2394 else:
2395 f()
2396 def close(self):
2397 record.append(2)
2398 super().close()
2399 def flush(self):
2400 record.append(3)
2401 super().flush()
2402 b = self.BytesIO()
2403 t = MyTextIO(b, encoding="ascii")
2404 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002405 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 self.assertEqual(record, [1, 2, 3])
2407
2408 def test_error_through_destructor(self):
2409 # Test that the exception state is not modified by a destructor,
2410 # even if close() fails.
2411 rawio = self.CloseFailureIO()
2412 def f():
2413 self.TextIOWrapper(rawio).xyzzy
2414 with support.captured_output("stderr") as s:
2415 self.assertRaises(AttributeError, f)
2416 s = s.getvalue().strip()
2417 if s:
2418 # The destructor *may* have printed an unraisable error, check it
2419 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002420 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002421 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002422
Guido van Rossum9b76da62007-04-11 01:09:03 +00002423 # Systematic tests of the text I/O API
2424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002425 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002426 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002427 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002429 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002430 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002431 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002433 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002434 self.assertEqual(f.tell(), 0)
2435 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002436 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002437 self.assertEqual(f.seek(0), 0)
2438 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002439 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002440 self.assertEqual(f.read(2), "ab")
2441 self.assertEqual(f.read(1), "c")
2442 self.assertEqual(f.read(1), "")
2443 self.assertEqual(f.read(), "")
2444 self.assertEqual(f.tell(), cookie)
2445 self.assertEqual(f.seek(0), 0)
2446 self.assertEqual(f.seek(0, 2), cookie)
2447 self.assertEqual(f.write("def"), 3)
2448 self.assertEqual(f.seek(cookie), cookie)
2449 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002450 if enc.startswith("utf"):
2451 self.multi_line_test(f, enc)
2452 f.close()
2453
2454 def multi_line_test(self, f, enc):
2455 f.seek(0)
2456 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002457 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002458 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002459 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002460 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002461 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002462 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002463 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002464 wlines.append((f.tell(), line))
2465 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002466 f.seek(0)
2467 rlines = []
2468 while True:
2469 pos = f.tell()
2470 line = f.readline()
2471 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002472 break
2473 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002474 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002477 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002478 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002479 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002480 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002481 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002482 p2 = f.tell()
2483 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002484 self.assertEqual(f.tell(), p0)
2485 self.assertEqual(f.readline(), "\xff\n")
2486 self.assertEqual(f.tell(), p1)
2487 self.assertEqual(f.readline(), "\xff\n")
2488 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002489 f.seek(0)
2490 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002491 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002492 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002493 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002494 f.close()
2495
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496 def test_seeking(self):
2497 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002498 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002499 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002500 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002502 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002503 suffix = bytes(u_suffix.encode("utf-8"))
2504 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002505 with self.open(support.TESTFN, "wb") as f:
2506 f.write(line*2)
2507 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2508 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002509 self.assertEqual(s, str(prefix, "ascii"))
2510 self.assertEqual(f.tell(), prefix_size)
2511 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002514 # Regression test for a specific bug
2515 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002516 with self.open(support.TESTFN, "wb") as f:
2517 f.write(data)
2518 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2519 f._CHUNK_SIZE # Just test that it exists
2520 f._CHUNK_SIZE = 2
2521 f.readline()
2522 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002524 def test_seek_and_tell(self):
2525 #Test seek/tell using the StatefulIncrementalDecoder.
2526 # Make test faster by doing smaller seeks
2527 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002528
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002529 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002530 """Tell/seek to various points within a data stream and ensure
2531 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002533 f.write(data)
2534 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002535 f = self.open(support.TESTFN, encoding='test_decoder')
2536 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002537 decoded = f.read()
2538 f.close()
2539
Neal Norwitze2b07052008-03-18 19:52:05 +00002540 for i in range(min_pos, len(decoded) + 1): # seek positions
2541 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002543 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002544 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002545 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002546 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002547 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002548 f.close()
2549
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002550 # Enable the test decoder.
2551 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002552
2553 # Run the tests.
2554 try:
2555 # Try each test case.
2556 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002557 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002558
2559 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002560 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2561 offset = CHUNK_SIZE - len(input)//2
2562 prefix = b'.'*offset
2563 # Don't bother seeking into the prefix (takes too long).
2564 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002565 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002566
2567 # Ensure our test decoder won't interfere with subsequent tests.
2568 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002569 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002572 data = "1234567890"
2573 tests = ("utf-16",
2574 "utf-16-le",
2575 "utf-16-be",
2576 "utf-32",
2577 "utf-32-le",
2578 "utf-32-be")
2579 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580 buf = self.BytesIO()
2581 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002582 # Check if the BOM is written only once (see issue1753).
2583 f.write(data)
2584 f.write(data)
2585 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002586 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002587 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002588 self.assertEqual(f.read(), data * 2)
2589 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002590
Benjamin Petersona1b49012009-03-31 23:11:32 +00002591 def test_unreadable(self):
2592 class UnReadable(self.BytesIO):
2593 def readable(self):
2594 return False
2595 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002596 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 def test_read_one_by_one(self):
2599 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002600 reads = ""
2601 while True:
2602 c = txt.read(1)
2603 if not c:
2604 break
2605 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002606 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002607
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002608 def test_readlines(self):
2609 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2610 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2611 txt.seek(0)
2612 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2613 txt.seek(0)
2614 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2615
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002616 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002617 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002618 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002620 reads = ""
2621 while True:
2622 c = txt.read(128)
2623 if not c:
2624 break
2625 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002626 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002627
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002628 def test_writelines(self):
2629 l = ['ab', 'cd', 'ef']
2630 buf = self.BytesIO()
2631 txt = self.TextIOWrapper(buf)
2632 txt.writelines(l)
2633 txt.flush()
2634 self.assertEqual(buf.getvalue(), b'abcdef')
2635
2636 def test_writelines_userlist(self):
2637 l = UserList(['ab', 'cd', 'ef'])
2638 buf = self.BytesIO()
2639 txt = self.TextIOWrapper(buf)
2640 txt.writelines(l)
2641 txt.flush()
2642 self.assertEqual(buf.getvalue(), b'abcdef')
2643
2644 def test_writelines_error(self):
2645 txt = self.TextIOWrapper(self.BytesIO())
2646 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2647 self.assertRaises(TypeError, txt.writelines, None)
2648 self.assertRaises(TypeError, txt.writelines, b'abc')
2649
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002650 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002651 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002652
2653 # read one char at a time
2654 reads = ""
2655 while True:
2656 c = txt.read(1)
2657 if not c:
2658 break
2659 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002660 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002661
2662 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002664 txt._CHUNK_SIZE = 4
2665
2666 reads = ""
2667 while True:
2668 c = txt.read(4)
2669 if not c:
2670 break
2671 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002672 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002673
2674 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002676 txt._CHUNK_SIZE = 4
2677
2678 reads = txt.read(4)
2679 reads += txt.read(4)
2680 reads += txt.readline()
2681 reads += txt.readline()
2682 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002683 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002684
2685 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002686 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002687 txt._CHUNK_SIZE = 4
2688
2689 reads = txt.read(4)
2690 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002691 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002692
2693 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002695 txt._CHUNK_SIZE = 4
2696
2697 reads = txt.read(4)
2698 pos = txt.tell()
2699 txt.seek(0)
2700 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002701 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002702
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002703 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002704 buffer = self.BytesIO(self.testdata)
2705 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002706
2707 self.assertEqual(buffer.seekable(), txt.seekable())
2708
Antoine Pitroue4501852009-05-14 18:55:55 +00002709 def test_append_bom(self):
2710 # The BOM is not written again when appending to a non-empty file
2711 filename = support.TESTFN
2712 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2713 with self.open(filename, 'w', encoding=charset) as f:
2714 f.write('aaa')
2715 pos = f.tell()
2716 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002717 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002718
2719 with self.open(filename, 'a', encoding=charset) as f:
2720 f.write('xxx')
2721 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002722 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002723
2724 def test_seek_bom(self):
2725 # Same test, but when seeking manually
2726 filename = support.TESTFN
2727 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2728 with self.open(filename, 'w', encoding=charset) as f:
2729 f.write('aaa')
2730 pos = f.tell()
2731 with self.open(filename, 'r+', encoding=charset) as f:
2732 f.seek(pos)
2733 f.write('zzz')
2734 f.seek(0)
2735 f.write('bbb')
2736 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002738
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002739 def test_errors_property(self):
2740 with self.open(support.TESTFN, "w") as f:
2741 self.assertEqual(f.errors, "strict")
2742 with self.open(support.TESTFN, "w", errors="replace") as f:
2743 self.assertEqual(f.errors, "replace")
2744
Brett Cannon31f59292011-02-21 19:29:56 +00002745 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002746 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002747 def test_threads_write(self):
2748 # Issue6750: concurrent writes could duplicate data
2749 event = threading.Event()
2750 with self.open(support.TESTFN, "w", buffering=1) as f:
2751 def run(n):
2752 text = "Thread%03d\n" % n
2753 event.wait()
2754 f.write(text)
2755 threads = [threading.Thread(target=lambda n=x: run(n))
2756 for x in range(20)]
2757 for t in threads:
2758 t.start()
2759 time.sleep(0.02)
2760 event.set()
2761 for t in threads:
2762 t.join()
2763 with self.open(support.TESTFN) as f:
2764 content = f.read()
2765 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002767
Antoine Pitrou6be88762010-05-03 16:48:20 +00002768 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002769 # Test that text file is closed despite failed flush
2770 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002771 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002772 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002773 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002774 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002775 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002776 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002777 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002778 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002779 self.assertTrue(txt.buffer.closed)
2780 self.assertTrue(closed) # flush() called
2781 self.assertFalse(closed[0]) # flush() called before file closed
2782 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002783 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002784
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002785 def test_close_error_on_close(self):
2786 buffer = self.BytesIO(self.testdata)
2787 def bad_flush():
2788 raise OSError('flush')
2789 def bad_close():
2790 raise OSError('close')
2791 buffer.close = bad_close
2792 txt = self.TextIOWrapper(buffer, encoding="ascii")
2793 txt.flush = bad_flush
2794 with self.assertRaises(OSError) as err: # exception not swallowed
2795 txt.close()
2796 self.assertEqual(err.exception.args, ('close',))
2797 self.assertIsInstance(err.exception.__context__, OSError)
2798 self.assertEqual(err.exception.__context__.args, ('flush',))
2799 self.assertFalse(txt.closed)
2800
2801 def test_nonnormalized_close_error_on_close(self):
2802 # Issue #21677
2803 buffer = self.BytesIO(self.testdata)
2804 def bad_flush():
2805 raise non_existing_flush
2806 def bad_close():
2807 raise non_existing_close
2808 buffer.close = bad_close
2809 txt = self.TextIOWrapper(buffer, encoding="ascii")
2810 txt.flush = bad_flush
2811 with self.assertRaises(NameError) as err: # exception not swallowed
2812 txt.close()
2813 self.assertIn('non_existing_close', str(err.exception))
2814 self.assertIsInstance(err.exception.__context__, NameError)
2815 self.assertIn('non_existing_flush', str(err.exception.__context__))
2816 self.assertFalse(txt.closed)
2817
Antoine Pitrou6be88762010-05-03 16:48:20 +00002818 def test_multi_close(self):
2819 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2820 txt.close()
2821 txt.close()
2822 txt.close()
2823 self.assertRaises(ValueError, txt.flush)
2824
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002825 def test_unseekable(self):
2826 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2827 self.assertRaises(self.UnsupportedOperation, txt.tell)
2828 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2829
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002830 def test_readonly_attributes(self):
2831 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2832 buf = self.BytesIO(self.testdata)
2833 with self.assertRaises(AttributeError):
2834 txt.buffer = buf
2835
Antoine Pitroue96ec682011-07-23 21:46:35 +02002836 def test_rawio(self):
2837 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2838 # that subprocess.Popen() can have the required unbuffered
2839 # semantics with universal_newlines=True.
2840 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2841 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2842 # Reads
2843 self.assertEqual(txt.read(4), 'abcd')
2844 self.assertEqual(txt.readline(), 'efghi\n')
2845 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2846
2847 def test_rawio_write_through(self):
2848 # Issue #12591: with write_through=True, writes don't need a flush
2849 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2850 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2851 write_through=True)
2852 txt.write('1')
2853 txt.write('23\n4')
2854 txt.write('5')
2855 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2856
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002857 def test_bufio_write_through(self):
2858 # Issue #21396: write_through=True doesn't force a flush()
2859 # on the underlying binary buffered object.
2860 flush_called, write_called = [], []
2861 class BufferedWriter(self.BufferedWriter):
2862 def flush(self, *args, **kwargs):
2863 flush_called.append(True)
2864 return super().flush(*args, **kwargs)
2865 def write(self, *args, **kwargs):
2866 write_called.append(True)
2867 return super().write(*args, **kwargs)
2868
2869 rawio = self.BytesIO()
2870 data = b"a"
2871 bufio = BufferedWriter(rawio, len(data)*2)
2872 textio = self.TextIOWrapper(bufio, encoding='ascii',
2873 write_through=True)
2874 # write to the buffered io but don't overflow the buffer
2875 text = data.decode('ascii')
2876 textio.write(text)
2877
2878 # buffer.flush is not called with write_through=True
2879 self.assertFalse(flush_called)
2880 # buffer.write *is* called with write_through=True
2881 self.assertTrue(write_called)
2882 self.assertEqual(rawio.getvalue(), b"") # no flush
2883
2884 write_called = [] # reset
2885 textio.write(text * 10) # total content is larger than bufio buffer
2886 self.assertTrue(write_called)
2887 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2888
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002889 def test_read_nonbytes(self):
2890 # Issue #17106
2891 # Crash when underlying read() returns non-bytes
2892 t = self.TextIOWrapper(self.StringIO('a'))
2893 self.assertRaises(TypeError, t.read, 1)
2894 t = self.TextIOWrapper(self.StringIO('a'))
2895 self.assertRaises(TypeError, t.readline)
2896 t = self.TextIOWrapper(self.StringIO('a'))
2897 self.assertRaises(TypeError, t.read)
2898
2899 def test_illegal_decoder(self):
2900 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002901 # Bypass the early encoding check added in issue 20404
2902 def _make_illegal_wrapper():
2903 quopri = codecs.lookup("quopri")
2904 quopri._is_text_encoding = True
2905 try:
2906 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2907 newline='\n', encoding="quopri")
2908 finally:
2909 quopri._is_text_encoding = False
2910 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002911 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002912 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002913 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002914 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002915 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002916 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002917 self.assertRaises(TypeError, t.read)
2918
Antoine Pitrou712cb732013-12-21 15:51:54 +01002919 def _check_create_at_shutdown(self, **kwargs):
2920 # Issue #20037: creating a TextIOWrapper at shutdown
2921 # shouldn't crash the interpreter.
2922 iomod = self.io.__name__
2923 code = """if 1:
2924 import codecs
2925 import {iomod} as io
2926
2927 # Avoid looking up codecs at shutdown
2928 codecs.lookup('utf-8')
2929
2930 class C:
2931 def __init__(self):
2932 self.buf = io.BytesIO()
2933 def __del__(self):
2934 io.TextIOWrapper(self.buf, **{kwargs})
2935 print("ok")
2936 c = C()
2937 """.format(iomod=iomod, kwargs=kwargs)
2938 return assert_python_ok("-c", code)
2939
2940 def test_create_at_shutdown_without_encoding(self):
2941 rc, out, err = self._check_create_at_shutdown()
2942 if err:
2943 # Can error out with a RuntimeError if the module state
2944 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002945 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002946 else:
2947 self.assertEqual("ok", out.decode().strip())
2948
2949 def test_create_at_shutdown_with_encoding(self):
2950 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2951 errors='strict')
2952 self.assertFalse(err)
2953 self.assertEqual("ok", out.decode().strip())
2954
Antoine Pitroub8503892014-04-29 10:14:02 +02002955 def test_read_byteslike(self):
2956 r = MemviewBytesIO(b'Just some random string\n')
2957 t = self.TextIOWrapper(r, 'utf-8')
2958
2959 # TextIOwrapper will not read the full string, because
2960 # we truncate it to a multiple of the native int size
2961 # so that we can construct a more complex memoryview.
2962 bytes_val = _to_memoryview(r.getvalue()).tobytes()
2963
2964 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
2965
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002966 def test_issue22849(self):
2967 class F(object):
2968 def readable(self): return True
2969 def writable(self): return True
2970 def seekable(self): return True
2971
2972 for i in range(10):
2973 try:
2974 self.TextIOWrapper(F(), encoding='utf-8')
2975 except Exception:
2976 pass
2977
2978 F.tell = lambda x: 0
2979 t = self.TextIOWrapper(F(), encoding='utf-8')
2980
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002981
Antoine Pitroub8503892014-04-29 10:14:02 +02002982class MemviewBytesIO(io.BytesIO):
2983 '''A BytesIO object whose read method returns memoryviews
2984 rather than bytes'''
2985
2986 def read1(self, len_):
2987 return _to_memoryview(super().read1(len_))
2988
2989 def read(self, len_):
2990 return _to_memoryview(super().read(len_))
2991
2992def _to_memoryview(buf):
2993 '''Convert bytes-object *buf* to a non-trivial memoryview'''
2994
2995 arr = array.array('i')
2996 idx = len(buf) - len(buf) % arr.itemsize
2997 arr.frombytes(buf[:idx])
2998 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002999
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003002 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10003003 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003004
3005 def test_initialization(self):
3006 r = self.BytesIO(b"\xc3\xa9\n\n")
3007 b = self.BufferedReader(r, 1000)
3008 t = self.TextIOWrapper(b)
3009 self.assertRaises(TypeError, t.__init__, b, newline=42)
3010 self.assertRaises(ValueError, t.read)
3011 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3012 self.assertRaises(ValueError, t.read)
3013
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003014 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3015 self.assertRaises(Exception, repr, t)
3016
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003017 def test_garbage_collection(self):
3018 # C TextIOWrapper objects are collected, and collecting them flushes
3019 # all data to disk.
3020 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003021 with support.check_warnings(('', ResourceWarning)):
3022 rawio = io.FileIO(support.TESTFN, "wb")
3023 b = self.BufferedWriter(rawio)
3024 t = self.TextIOWrapper(b, encoding="ascii")
3025 t.write("456def")
3026 t.x = t
3027 wr = weakref.ref(t)
3028 del t
3029 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003030 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003031 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003032 self.assertEqual(f.read(), b"456def")
3033
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003034 def test_rwpair_cleared_before_textio(self):
3035 # Issue 13070: TextIOWrapper's finalization would crash when called
3036 # after the reference to the underlying BufferedRWPair's writer got
3037 # cleared by the GC.
3038 for i in range(1000):
3039 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3040 t1 = self.TextIOWrapper(b1, encoding="ascii")
3041 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3042 t2 = self.TextIOWrapper(b2, encoding="ascii")
3043 # circular references
3044 t1.buddy = t2
3045 t2.buddy = t1
3046 support.gc_collect()
3047
3048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003049class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003050 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02003051 #shutdown_error = "LookupError: unknown encoding: ascii"
3052 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003053
3054
3055class IncrementalNewlineDecoderTest(unittest.TestCase):
3056
3057 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003058 # UTF-8 specific tests for a newline decoder
3059 def _check_decode(b, s, **kwargs):
3060 # We exercise getstate() / setstate() as well as decode()
3061 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003062 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003063 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003064 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003065
Antoine Pitrou180a3362008-12-14 16:36:46 +00003066 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003067
Antoine Pitrou180a3362008-12-14 16:36:46 +00003068 _check_decode(b'\xe8', "")
3069 _check_decode(b'\xa2', "")
3070 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003071
Antoine Pitrou180a3362008-12-14 16:36:46 +00003072 _check_decode(b'\xe8', "")
3073 _check_decode(b'\xa2', "")
3074 _check_decode(b'\x88', "\u8888")
3075
3076 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003077 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3078
Antoine Pitrou180a3362008-12-14 16:36:46 +00003079 decoder.reset()
3080 _check_decode(b'\n', "\n")
3081 _check_decode(b'\r', "")
3082 _check_decode(b'', "\n", final=True)
3083 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003084
Antoine Pitrou180a3362008-12-14 16:36:46 +00003085 _check_decode(b'\r', "")
3086 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003087
Antoine Pitrou180a3362008-12-14 16:36:46 +00003088 _check_decode(b'\r\r\n', "\n\n")
3089 _check_decode(b'\r', "")
3090 _check_decode(b'\r', "\n")
3091 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003092
Antoine Pitrou180a3362008-12-14 16:36:46 +00003093 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3094 _check_decode(b'\xe8\xa2\x88', "\u8888")
3095 _check_decode(b'\n', "\n")
3096 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3097 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003098
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003099 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003100 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003101 if encoding is not None:
3102 encoder = codecs.getincrementalencoder(encoding)()
3103 def _decode_bytewise(s):
3104 # Decode one byte at a time
3105 for b in encoder.encode(s):
3106 result.append(decoder.decode(bytes([b])))
3107 else:
3108 encoder = None
3109 def _decode_bytewise(s):
3110 # Decode one char at a time
3111 for c in s:
3112 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003113 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003114 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003115 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003116 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003117 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003118 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003119 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003120 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003121 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003122 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003123 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003124 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003125 input = "abc"
3126 if encoder is not None:
3127 encoder.reset()
3128 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003129 self.assertEqual(decoder.decode(input), "abc")
3130 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003131
3132 def test_newline_decoder(self):
3133 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003134 # None meaning the IncrementalNewlineDecoder takes unicode input
3135 # rather than bytes input
3136 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003137 'utf-16', 'utf-16-le', 'utf-16-be',
3138 'utf-32', 'utf-32-le', 'utf-32-be',
3139 )
3140 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003141 decoder = enc and codecs.getincrementaldecoder(enc)()
3142 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3143 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003144 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003145 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3146 self.check_newline_decoding_utf8(decoder)
3147
Antoine Pitrou66913e22009-03-06 23:40:56 +00003148 def test_newline_bytes(self):
3149 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3150 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003151 self.assertEqual(dec.newlines, None)
3152 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3153 self.assertEqual(dec.newlines, None)
3154 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3155 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003156 dec = self.IncrementalNewlineDecoder(None, translate=False)
3157 _check(dec)
3158 dec = self.IncrementalNewlineDecoder(None, translate=True)
3159 _check(dec)
3160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003161class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3162 pass
3163
3164class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3165 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003166
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003167
Guido van Rossum01a27522007-03-07 01:00:12 +00003168# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003169
Guido van Rossum5abbf752007-08-27 17:39:33 +00003170class MiscIOTest(unittest.TestCase):
3171
Barry Warsaw40e82462008-11-20 20:14:50 +00003172 def tearDown(self):
3173 support.unlink(support.TESTFN)
3174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175 def test___all__(self):
3176 for name in self.io.__all__:
3177 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003178 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003179 if name == "open":
3180 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003181 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003182 self.assertTrue(issubclass(obj, Exception), name)
3183 elif not name.startswith("SEEK_"):
3184 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003185
Barry Warsaw40e82462008-11-20 20:14:50 +00003186 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003187 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003188 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003189 f.close()
3190
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003191 with support.check_warnings(('', DeprecationWarning)):
3192 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003193 self.assertEqual(f.name, support.TESTFN)
3194 self.assertEqual(f.buffer.name, support.TESTFN)
3195 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3196 self.assertEqual(f.mode, "U")
3197 self.assertEqual(f.buffer.mode, "rb")
3198 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003199 f.close()
3200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003201 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003202 self.assertEqual(f.mode, "w+")
3203 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3204 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003206 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003207 self.assertEqual(g.mode, "wb")
3208 self.assertEqual(g.raw.mode, "wb")
3209 self.assertEqual(g.name, f.fileno())
3210 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003211 f.close()
3212 g.close()
3213
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003214 def test_io_after_close(self):
3215 for kwargs in [
3216 {"mode": "w"},
3217 {"mode": "wb"},
3218 {"mode": "w", "buffering": 1},
3219 {"mode": "w", "buffering": 2},
3220 {"mode": "wb", "buffering": 0},
3221 {"mode": "r"},
3222 {"mode": "rb"},
3223 {"mode": "r", "buffering": 1},
3224 {"mode": "r", "buffering": 2},
3225 {"mode": "rb", "buffering": 0},
3226 {"mode": "w+"},
3227 {"mode": "w+b"},
3228 {"mode": "w+", "buffering": 1},
3229 {"mode": "w+", "buffering": 2},
3230 {"mode": "w+b", "buffering": 0},
3231 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003232 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003233 f.close()
3234 self.assertRaises(ValueError, f.flush)
3235 self.assertRaises(ValueError, f.fileno)
3236 self.assertRaises(ValueError, f.isatty)
3237 self.assertRaises(ValueError, f.__iter__)
3238 if hasattr(f, "peek"):
3239 self.assertRaises(ValueError, f.peek, 1)
3240 self.assertRaises(ValueError, f.read)
3241 if hasattr(f, "read1"):
3242 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003243 if hasattr(f, "readall"):
3244 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003245 if hasattr(f, "readinto"):
3246 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003247 if hasattr(f, "readinto1"):
3248 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003249 self.assertRaises(ValueError, f.readline)
3250 self.assertRaises(ValueError, f.readlines)
3251 self.assertRaises(ValueError, f.seek, 0)
3252 self.assertRaises(ValueError, f.tell)
3253 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003254 self.assertRaises(ValueError, f.write,
3255 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003256 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003257 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003259 def test_blockingioerror(self):
3260 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003261 class C(str):
3262 pass
3263 c = C("")
3264 b = self.BlockingIOError(1, c)
3265 c.b = b
3266 b.c = c
3267 wr = weakref.ref(c)
3268 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003269 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003270 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003271
3272 def test_abcs(self):
3273 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003274 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3275 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3276 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3277 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003278
3279 def _check_abc_inheritance(self, abcmodule):
3280 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003281 self.assertIsInstance(f, abcmodule.IOBase)
3282 self.assertIsInstance(f, abcmodule.RawIOBase)
3283 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3284 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003285 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003286 self.assertIsInstance(f, abcmodule.IOBase)
3287 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3288 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3289 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003290 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003291 self.assertIsInstance(f, abcmodule.IOBase)
3292 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3293 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3294 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003295
3296 def test_abc_inheritance(self):
3297 # Test implementations inherit from their respective ABCs
3298 self._check_abc_inheritance(self)
3299
3300 def test_abc_inheritance_official(self):
3301 # Test implementations inherit from the official ABCs of the
3302 # baseline "io" module.
3303 self._check_abc_inheritance(io)
3304
Antoine Pitroue033e062010-10-29 10:38:18 +00003305 def _check_warn_on_dealloc(self, *args, **kwargs):
3306 f = open(*args, **kwargs)
3307 r = repr(f)
3308 with self.assertWarns(ResourceWarning) as cm:
3309 f = None
3310 support.gc_collect()
3311 self.assertIn(r, str(cm.warning.args[0]))
3312
3313 def test_warn_on_dealloc(self):
3314 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3315 self._check_warn_on_dealloc(support.TESTFN, "wb")
3316 self._check_warn_on_dealloc(support.TESTFN, "w")
3317
3318 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3319 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003320 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003321 for fd in fds:
3322 try:
3323 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003324 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003325 if e.errno != errno.EBADF:
3326 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003327 self.addCleanup(cleanup_fds)
3328 r, w = os.pipe()
3329 fds += r, w
3330 self._check_warn_on_dealloc(r, *args, **kwargs)
3331 # When using closefd=False, there's no warning
3332 r, w = os.pipe()
3333 fds += r, w
3334 with warnings.catch_warnings(record=True) as recorded:
3335 open(r, *args, closefd=False, **kwargs)
3336 support.gc_collect()
3337 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003338
3339 def test_warn_on_dealloc_fd(self):
3340 self._check_warn_on_dealloc_fd("rb", buffering=0)
3341 self._check_warn_on_dealloc_fd("rb")
3342 self._check_warn_on_dealloc_fd("r")
3343
3344
Antoine Pitrou243757e2010-11-05 21:15:39 +00003345 def test_pickling(self):
3346 # Pickling file objects is forbidden
3347 for kwargs in [
3348 {"mode": "w"},
3349 {"mode": "wb"},
3350 {"mode": "wb", "buffering": 0},
3351 {"mode": "r"},
3352 {"mode": "rb"},
3353 {"mode": "rb", "buffering": 0},
3354 {"mode": "w+"},
3355 {"mode": "w+b"},
3356 {"mode": "w+b", "buffering": 0},
3357 ]:
3358 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3359 with self.open(support.TESTFN, **kwargs) as f:
3360 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3361
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003362 def test_nonblock_pipe_write_bigbuf(self):
3363 self._test_nonblock_pipe_write(16*1024)
3364
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003365 def test_nonblock_pipe_write_smallbuf(self):
3366 self._test_nonblock_pipe_write(1024)
3367
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003368 @unittest.skipUnless(hasattr(os, 'set_blocking'),
3369 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003370 def _test_nonblock_pipe_write(self, bufsize):
3371 sent = []
3372 received = []
3373 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003374 os.set_blocking(r, False)
3375 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003376
3377 # To exercise all code paths in the C implementation we need
3378 # to play with buffer sizes. For instance, if we choose a
3379 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3380 # then we will never get a partial write of the buffer.
3381 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3382 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3383
3384 with rf, wf:
3385 for N in 9999, 73, 7574:
3386 try:
3387 i = 0
3388 while True:
3389 msg = bytes([i % 26 + 97]) * N
3390 sent.append(msg)
3391 wf.write(msg)
3392 i += 1
3393
3394 except self.BlockingIOError as e:
3395 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003396 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003397 sent[-1] = sent[-1][:e.characters_written]
3398 received.append(rf.read())
3399 msg = b'BLOCKED'
3400 wf.write(msg)
3401 sent.append(msg)
3402
3403 while True:
3404 try:
3405 wf.flush()
3406 break
3407 except self.BlockingIOError as e:
3408 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003409 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003410 self.assertEqual(e.characters_written, 0)
3411 received.append(rf.read())
3412
3413 received += iter(rf.read, None)
3414
3415 sent, received = b''.join(sent), b''.join(received)
3416 self.assertTrue(sent == received)
3417 self.assertTrue(wf.closed)
3418 self.assertTrue(rf.closed)
3419
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003420 def test_create_fail(self):
3421 # 'x' mode fails if file is existing
3422 with self.open(support.TESTFN, 'w'):
3423 pass
3424 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3425
3426 def test_create_writes(self):
3427 # 'x' mode opens for writing
3428 with self.open(support.TESTFN, 'xb') as f:
3429 f.write(b"spam")
3430 with self.open(support.TESTFN, 'rb') as f:
3431 self.assertEqual(b"spam", f.read())
3432
Christian Heimes7b648752012-09-10 14:48:43 +02003433 def test_open_allargs(self):
3434 # there used to be a buffer overflow in the parser for rawmode
3435 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3436
3437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003438class CMiscIOTest(MiscIOTest):
3439 io = io
3440
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003441 def test_readinto_buffer_overflow(self):
3442 # Issue #18025
3443 class BadReader(self.io.BufferedIOBase):
3444 def read(self, n=-1):
3445 return b'x' * 10**6
3446 bufio = BadReader()
3447 b = bytearray(2)
3448 self.assertRaises(ValueError, bufio.readinto, b)
3449
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003450class PyMiscIOTest(MiscIOTest):
3451 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003452
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003453
3454@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3455class SignalsTest(unittest.TestCase):
3456
3457 def setUp(self):
3458 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3459
3460 def tearDown(self):
3461 signal.signal(signal.SIGALRM, self.oldalrm)
3462
3463 def alarm_interrupt(self, sig, frame):
3464 1/0
3465
3466 @unittest.skipUnless(threading, 'Threading required for this test.')
3467 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3468 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003469 invokes the signal handler, and bubbles up the exception raised
3470 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003471 read_results = []
3472 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003473 if hasattr(signal, 'pthread_sigmask'):
3474 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003475 s = os.read(r, 1)
3476 read_results.append(s)
3477 t = threading.Thread(target=_read)
3478 t.daemon = True
3479 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003480 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01003481 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003482 try:
3483 wio = self.io.open(w, **fdopen_kwargs)
3484 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003485 # Fill the pipe enough that the write will be blocking.
3486 # It will be interrupted by the timer armed above. Since the
3487 # other thread has read one byte, the low-level write will
3488 # return with a successful (partial) result rather than an EINTR.
3489 # The buffered IO layer must check for pending signal
3490 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003491 signal.alarm(1)
3492 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01003493 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02003494 finally:
3495 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003496 t.join()
3497 # We got one byte, get another one and check that it isn't a
3498 # repeat of the first one.
3499 read_results.append(os.read(r, 1))
3500 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3501 finally:
3502 os.close(w)
3503 os.close(r)
3504 # This is deliberate. If we didn't close the file descriptor
3505 # before closing wio, wio would try to flush its internal
3506 # buffer, and block again.
3507 try:
3508 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003509 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003510 if e.errno != errno.EBADF:
3511 raise
3512
3513 def test_interrupted_write_unbuffered(self):
3514 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3515
3516 def test_interrupted_write_buffered(self):
3517 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3518
Victor Stinner6ab72862014-09-03 23:32:28 +02003519 # Issue #22331: The test hangs on FreeBSD 7.2
3520 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003521 def test_interrupted_write_text(self):
3522 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3523
Brett Cannon31f59292011-02-21 19:29:56 +00003524 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003525 def check_reentrant_write(self, data, **fdopen_kwargs):
3526 def on_alarm(*args):
3527 # Will be called reentrantly from the same thread
3528 wio.write(data)
3529 1/0
3530 signal.signal(signal.SIGALRM, on_alarm)
3531 r, w = os.pipe()
3532 wio = self.io.open(w, **fdopen_kwargs)
3533 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003534 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003535 # Either the reentrant call to wio.write() fails with RuntimeError,
3536 # or the signal handler raises ZeroDivisionError.
3537 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3538 while 1:
3539 for i in range(100):
3540 wio.write(data)
3541 wio.flush()
3542 # Make sure the buffer doesn't fill up and block further writes
3543 os.read(r, len(data) * 100)
3544 exc = cm.exception
3545 if isinstance(exc, RuntimeError):
3546 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3547 finally:
3548 wio.close()
3549 os.close(r)
3550
3551 def test_reentrant_write_buffered(self):
3552 self.check_reentrant_write(b"xy", mode="wb")
3553
3554 def test_reentrant_write_text(self):
3555 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3556
Antoine Pitrou707ce822011-02-25 21:24:11 +00003557 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3558 """Check that a buffered read, when it gets interrupted (either
3559 returning a partial result or EINTR), properly invokes the signal
3560 handler and retries if the latter returned successfully."""
3561 r, w = os.pipe()
3562 fdopen_kwargs["closefd"] = False
3563 def alarm_handler(sig, frame):
3564 os.write(w, b"bar")
3565 signal.signal(signal.SIGALRM, alarm_handler)
3566 try:
3567 rio = self.io.open(r, **fdopen_kwargs)
3568 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003569 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003570 # Expected behaviour:
3571 # - first raw read() returns partial b"foo"
3572 # - second raw read() returns EINTR
3573 # - third raw read() returns b"bar"
3574 self.assertEqual(decode(rio.read(6)), "foobar")
3575 finally:
3576 rio.close()
3577 os.close(w)
3578 os.close(r)
3579
Antoine Pitrou20db5112011-08-19 20:32:34 +02003580 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003581 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3582 mode="rb")
3583
Antoine Pitrou20db5112011-08-19 20:32:34 +02003584 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003585 self.check_interrupted_read_retry(lambda x: x,
3586 mode="r")
3587
3588 @unittest.skipUnless(threading, 'Threading required for this test.')
3589 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3590 """Check that a buffered write, when it gets interrupted (either
3591 returning a partial result or EINTR), properly invokes the signal
3592 handler and retries if the latter returned successfully."""
3593 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003594
Antoine Pitrou707ce822011-02-25 21:24:11 +00003595 # A quantity that exceeds the buffer size of an anonymous pipe's
3596 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003597 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003598 r, w = os.pipe()
3599 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003600
Antoine Pitrou707ce822011-02-25 21:24:11 +00003601 # We need a separate thread to read from the pipe and allow the
3602 # write() to finish. This thread is started after the SIGALRM is
3603 # received (forcing a first EINTR in write()).
3604 read_results = []
3605 write_finished = False
3606 def _read():
3607 while not write_finished:
3608 while r in select.select([r], [], [], 1.0)[0]:
3609 s = os.read(r, 1024)
3610 read_results.append(s)
3611 t = threading.Thread(target=_read)
3612 t.daemon = True
3613 def alarm1(sig, frame):
3614 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003615 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003616 def alarm2(sig, frame):
3617 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003618
3619 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00003620 signal.signal(signal.SIGALRM, alarm1)
3621 try:
3622 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003623 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003624 # Expected behaviour:
3625 # - first raw write() is partial (because of the limited pipe buffer
3626 # and the first alarm)
3627 # - second raw write() returns EINTR (because of the second alarm)
3628 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003629 written = wio.write(large_data)
3630 self.assertEqual(N, written)
3631
Antoine Pitrou707ce822011-02-25 21:24:11 +00003632 wio.flush()
3633 write_finished = True
3634 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01003635
Antoine Pitrou707ce822011-02-25 21:24:11 +00003636 self.assertEqual(N, sum(len(x) for x in read_results))
3637 finally:
3638 write_finished = True
3639 os.close(w)
3640 os.close(r)
3641 # This is deliberate. If we didn't close the file descriptor
3642 # before closing wio, wio would try to flush its internal
3643 # buffer, and could block (in case of failure).
3644 try:
3645 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003646 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003647 if e.errno != errno.EBADF:
3648 raise
3649
Antoine Pitrou20db5112011-08-19 20:32:34 +02003650 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003651 self.check_interrupted_write_retry(b"x", mode="wb")
3652
Antoine Pitrou20db5112011-08-19 20:32:34 +02003653 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003654 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3655
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003656
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003657class CSignalsTest(SignalsTest):
3658 io = io
3659
3660class PySignalsTest(SignalsTest):
3661 io = pyio
3662
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003663 # Handling reentrancy issues would slow down _pyio even more, so the
3664 # tests are disabled.
3665 test_reentrant_write_buffered = None
3666 test_reentrant_write_text = None
3667
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003668
Ezio Melottidaa42c72013-03-23 16:30:16 +02003669def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003670 tests = (CIOTest, PyIOTest,
3671 CBufferedReaderTest, PyBufferedReaderTest,
3672 CBufferedWriterTest, PyBufferedWriterTest,
3673 CBufferedRWPairTest, PyBufferedRWPairTest,
3674 CBufferedRandomTest, PyBufferedRandomTest,
3675 StatefulIncrementalDecoderTest,
3676 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3677 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003678 CMiscIOTest, PyMiscIOTest,
3679 CSignalsTest, PySignalsTest,
3680 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003681
3682 # Put the namespaces of the IO module we are testing and some useful mock
3683 # classes in the __dict__ of each test.
3684 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003685 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003686 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3687 c_io_ns = {name : getattr(io, name) for name in all_members}
3688 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3689 globs = globals()
3690 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3691 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3692 # Avoid turning open into a bound method.
3693 py_io_ns["open"] = pyio.OpenWrapper
3694 for test in tests:
3695 if test.__name__.startswith("C"):
3696 for name, obj in c_io_ns.items():
3697 setattr(test, name, obj)
3698 elif test.__name__.startswith("Py"):
3699 for name, obj in py_io_ns.items():
3700 setattr(test, name, obj)
3701
Ezio Melottidaa42c72013-03-23 16:30:16 +02003702 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3703 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003704
3705if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003706 unittest.main()