blob: 416c547334edb5248bf8e16304563ba7aca3dcb1 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou25f85d42015-04-13 19:41:47 +020038from test.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200596 def check_flush_error_on_close(self, *args, **kwargs):
597 # Test that the file is closed despite failed flush
598 # and that flush() is called before file closed.
599 f = self.open(*args, **kwargs)
600 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200602 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200603 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000604 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200605 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600606 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200607 self.assertTrue(closed) # flush() called
608 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200609 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200610
611 def test_flush_error_on_close(self):
612 # raw file
613 # Issue #5700: io.FileIO calls flush() after file closed
614 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
615 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
616 self.check_flush_error_on_close(fd, 'wb', buffering=0)
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
619 os.close(fd)
620 # buffered io
621 self.check_flush_error_on_close(support.TESTFN, 'wb')
622 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
623 self.check_flush_error_on_close(fd, 'wb')
624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625 self.check_flush_error_on_close(fd, 'wb', closefd=False)
626 os.close(fd)
627 # text io
628 self.check_flush_error_on_close(support.TESTFN, 'w')
629 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
630 self.check_flush_error_on_close(fd, 'w')
631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632 self.check_flush_error_on_close(fd, 'w', closefd=False)
633 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000634
635 def test_multi_close(self):
636 f = self.open(support.TESTFN, "wb", buffering=0)
637 f.close()
638 f.close()
639 f.close()
640 self.assertRaises(ValueError, f.flush)
641
Antoine Pitrou328ec742010-09-14 18:37:24 +0000642 def test_RawIOBase_read(self):
643 # Exercise the default RawIOBase.read() implementation (which calls
644 # readinto() internally).
645 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
646 self.assertEqual(rawio.read(2), b"ab")
647 self.assertEqual(rawio.read(2), b"c")
648 self.assertEqual(rawio.read(2), b"d")
649 self.assertEqual(rawio.read(2), None)
650 self.assertEqual(rawio.read(2), b"ef")
651 self.assertEqual(rawio.read(2), b"g")
652 self.assertEqual(rawio.read(2), None)
653 self.assertEqual(rawio.read(2), b"")
654
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400655 def test_types_have_dict(self):
656 test = (
657 self.IOBase(),
658 self.RawIOBase(),
659 self.TextIOBase(),
660 self.StringIO(),
661 self.BytesIO()
662 )
663 for obj in test:
664 self.assertTrue(hasattr(obj, "__dict__"))
665
Ross Lagerwall59142db2011-10-31 20:34:46 +0200666 def test_opener(self):
667 with self.open(support.TESTFN, "w") as f:
668 f.write("egg\n")
669 fd = os.open(support.TESTFN, os.O_RDONLY)
670 def opener(path, flags):
671 return fd
672 with self.open("non-existent", "r", opener=opener) as f:
673 self.assertEqual(f.read(), "egg\n")
674
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200675 def test_fileio_closefd(self):
676 # Issue #4841
677 with self.open(__file__, 'rb') as f1, \
678 self.open(__file__, 'rb') as f2:
679 fileio = self.FileIO(f1.fileno(), closefd=False)
680 # .__init__() must not close f1
681 fileio.__init__(f2.fileno(), closefd=False)
682 f1.readline()
683 # .close() must not close f2
684 fileio.close()
685 f2.readline()
686
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300687 def test_nonbuffered_textio(self):
688 with warnings.catch_warnings(record=True) as recorded:
689 with self.assertRaises(ValueError):
690 self.open(support.TESTFN, 'w', buffering=0)
691 support.gc_collect()
692 self.assertEqual(recorded, [])
693
694 def test_invalid_newline(self):
695 with warnings.catch_warnings(record=True) as recorded:
696 with self.assertRaises(ValueError):
697 self.open(support.TESTFN, 'w', newline='invalid')
698 support.gc_collect()
699 self.assertEqual(recorded, [])
700
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200703
704 def test_IOBase_finalize(self):
705 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
706 # class which inherits IOBase and an object of this class are caught
707 # in a reference cycle and close() is already in the method cache.
708 class MyIO(self.IOBase):
709 def close(self):
710 pass
711
712 # create an instance to populate the method cache
713 MyIO()
714 obj = MyIO()
715 obj.obj = obj
716 wr = weakref.ref(obj)
717 del MyIO
718 del obj
719 support.gc_collect()
720 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722class PyIOTest(IOTest):
723 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000724
Guido van Rossuma9e20242007-03-08 00:43:48 +0000725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726class CommonBufferedTests:
727 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
728
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000729 def test_detach(self):
730 raw = self.MockRawIO()
731 buf = self.tp(raw)
732 self.assertIs(buf.detach(), raw)
733 self.assertRaises(ValueError, buf.detach)
734
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600735 repr(buf) # Should still work
736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 def test_fileno(self):
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740
Ezio Melottib3aedd42010-11-20 19:04:17 +0000741 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742
Zachary Ware9fe6d862013-12-08 00:20:35 -0600743 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_no_fileno(self):
745 # XXX will we always have fileno() function? If so, kill
746 # this test. Else, write it.
747 pass
748
749 def test_invalid_args(self):
750 rawio = self.MockRawIO()
751 bufio = self.tp(rawio)
752 # Invalid whence
753 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200754 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755
756 def test_override_destructor(self):
757 tp = self.tp
758 record = []
759 class MyBufferedIO(tp):
760 def __del__(self):
761 record.append(1)
762 try:
763 f = super().__del__
764 except AttributeError:
765 pass
766 else:
767 f()
768 def close(self):
769 record.append(2)
770 super().close()
771 def flush(self):
772 record.append(3)
773 super().flush()
774 rawio = self.MockRawIO()
775 bufio = MyBufferedIO(rawio)
776 writable = bufio.writable()
777 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000778 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 if writable:
780 self.assertEqual(record, [1, 2, 3])
781 else:
782 self.assertEqual(record, [1, 2])
783
784 def test_context_manager(self):
785 # Test usability as a context manager
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio)
788 def _with():
789 with bufio:
790 pass
791 _with()
792 # bufio should now be closed, and using it a second time should raise
793 # a ValueError.
794 self.assertRaises(ValueError, _with)
795
796 def test_error_through_destructor(self):
797 # Test that the exception state is not modified by a destructor,
798 # even if close() fails.
799 rawio = self.CloseFailureIO()
800 def f():
801 self.tp(rawio).xyzzy
802 with support.captured_output("stderr") as s:
803 self.assertRaises(AttributeError, f)
804 s = s.getvalue().strip()
805 if s:
806 # The destructor *may* have printed an unraisable error, check it
807 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200808 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000809 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000810
Antoine Pitrou716c4442009-05-23 19:04:03 +0000811 def test_repr(self):
812 raw = self.MockRawIO()
813 b = self.tp(raw)
814 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
815 self.assertEqual(repr(b), "<%s>" % clsname)
816 raw.name = "dummy"
817 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
818 raw.name = b"dummy"
819 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
820
Antoine Pitrou6be88762010-05-03 16:48:20 +0000821 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200822 # Test that buffered file is closed despite failed flush
823 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000824 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200825 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200827 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200828 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000829 raw.flush = bad_flush
830 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200831 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600832 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200833 self.assertTrue(raw.closed)
834 self.assertTrue(closed) # flush() called
835 self.assertFalse(closed[0]) # flush() called before file closed
836 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200837 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600838
839 def test_close_error_on_close(self):
840 raw = self.MockRawIO()
841 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200842 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600843 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200844 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600845 raw.close = bad_close
846 b = self.tp(raw)
847 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200848 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600849 b.close()
850 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300851 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600852 self.assertEqual(err.exception.__context__.args, ('flush',))
853 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000854
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300855 def test_nonnormalized_close_error_on_close(self):
856 # Issue #21677
857 raw = self.MockRawIO()
858 def bad_flush():
859 raise non_existing_flush
860 def bad_close():
861 raise non_existing_close
862 raw.close = bad_close
863 b = self.tp(raw)
864 b.flush = bad_flush
865 with self.assertRaises(NameError) as err: # exception not swallowed
866 b.close()
867 self.assertIn('non_existing_close', str(err.exception))
868 self.assertIsInstance(err.exception.__context__, NameError)
869 self.assertIn('non_existing_flush', str(err.exception.__context__))
870 self.assertFalse(b.closed)
871
Antoine Pitrou6be88762010-05-03 16:48:20 +0000872 def test_multi_close(self):
873 raw = self.MockRawIO()
874 b = self.tp(raw)
875 b.close()
876 b.close()
877 b.close()
878 self.assertRaises(ValueError, b.flush)
879
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000880 def test_unseekable(self):
881 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
882 self.assertRaises(self.UnsupportedOperation, bufio.tell)
883 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
884
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000885 def test_readonly_attributes(self):
886 raw = self.MockRawIO()
887 buf = self.tp(raw)
888 x = self.MockRawIO()
889 with self.assertRaises(AttributeError):
890 buf.raw = x
891
Guido van Rossum78892e42007-04-06 17:31:18 +0000892
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200893class SizeofTest:
894
895 @support.cpython_only
896 def test_sizeof(self):
897 bufsize1 = 4096
898 bufsize2 = 8192
899 rawio = self.MockRawIO()
900 bufio = self.tp(rawio, buffer_size=bufsize1)
901 size = sys.getsizeof(bufio) - bufsize1
902 rawio = self.MockRawIO()
903 bufio = self.tp(rawio, buffer_size=bufsize2)
904 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
905
Jesus Ceadc469452012-10-04 12:37:56 +0200906 @support.cpython_only
907 def test_buffer_freeing(self) :
908 bufsize = 4096
909 rawio = self.MockRawIO()
910 bufio = self.tp(rawio, buffer_size=bufsize)
911 size = sys.getsizeof(bufio) - bufsize
912 bufio.close()
913 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
916 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 def test_constructor(self):
919 rawio = self.MockRawIO([b"abc"])
920 bufio = self.tp(rawio)
921 bufio.__init__(rawio)
922 bufio.__init__(rawio, buffer_size=1024)
923 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000924 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
927 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
928 rawio = self.MockRawIO([b"abc"])
929 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000930 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000931
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200932 def test_uninitialized(self):
933 bufio = self.tp.__new__(self.tp)
934 del bufio
935 bufio = self.tp.__new__(self.tp)
936 self.assertRaisesRegex((ValueError, AttributeError),
937 'uninitialized|has no attribute',
938 bufio.read, 0)
939 bufio.__init__(self.MockRawIO())
940 self.assertEqual(bufio.read(0), b'')
941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000943 for arg in (None, 7):
944 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
945 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 # Invalid args
948 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 def test_read1(self):
951 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"a", bufio.read(1))
954 self.assertEqual(b"b", bufio.read1(1))
955 self.assertEqual(rawio._reads, 1)
956 self.assertEqual(b"c", bufio.read1(100))
957 self.assertEqual(rawio._reads, 1)
958 self.assertEqual(b"d", bufio.read1(100))
959 self.assertEqual(rawio._reads, 2)
960 self.assertEqual(b"efg", bufio.read1(100))
961 self.assertEqual(rawio._reads, 3)
962 self.assertEqual(b"", bufio.read1(100))
963 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 # Invalid args
965 self.assertRaises(ValueError, bufio.read1, -1)
966
967 def test_readinto(self):
968 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
969 bufio = self.tp(rawio)
970 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000971 self.assertEqual(bufio.readinto(b), 2)
972 self.assertEqual(b, b"ab")
973 self.assertEqual(bufio.readinto(b), 2)
974 self.assertEqual(b, b"cd")
975 self.assertEqual(bufio.readinto(b), 2)
976 self.assertEqual(b, b"ef")
977 self.assertEqual(bufio.readinto(b), 1)
978 self.assertEqual(b, b"gf")
979 self.assertEqual(bufio.readinto(b), 0)
980 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200981 rawio = self.MockRawIO((b"abc", None))
982 bufio = self.tp(rawio)
983 self.assertEqual(bufio.readinto(b), 2)
984 self.assertEqual(b, b"ab")
985 self.assertEqual(bufio.readinto(b), 1)
986 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000988 def test_readlines(self):
989 def bufio():
990 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
991 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000992 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
993 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
994 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000997 data = b"abcdefghi"
998 dlen = len(data)
999
1000 tests = [
1001 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1002 [ 100, [ 3, 3, 3], [ dlen ] ],
1003 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1004 ]
1005
1006 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 rawio = self.MockFileIO(data)
1008 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001009 pos = 0
1010 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001011 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001012 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001014 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001017 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1019 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001020 self.assertEqual(b"abcd", bufio.read(6))
1021 self.assertEqual(b"e", bufio.read(1))
1022 self.assertEqual(b"fg", bufio.read())
1023 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001024 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001025 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001026
Victor Stinnera80987f2011-05-25 22:47:16 +02001027 rawio = self.MockRawIO((b"a", None, None))
1028 self.assertEqual(b"a", rawio.readall())
1029 self.assertIsNone(rawio.readall())
1030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_read_past_eof(self):
1032 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1033 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001034
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_read_all(self):
1038 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1039 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001040
Ezio Melottib3aedd42010-11-20 19:04:17 +00001041 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001042
Victor Stinner45df8202010-04-28 22:31:17 +00001043 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001044 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001046 try:
1047 # Write out many bytes with exactly the same number of 0's,
1048 # 1's... 255's. This will help us check that concurrent reading
1049 # doesn't duplicate or forget contents.
1050 N = 1000
1051 l = list(range(256)) * N
1052 random.shuffle(l)
1053 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001054 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001055 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001056 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001058 errors = []
1059 results = []
1060 def f():
1061 try:
1062 # Intra-buffer read then buffer-flushing read
1063 for n in cycle([1, 19]):
1064 s = bufio.read(n)
1065 if not s:
1066 break
1067 # list.append() is atomic
1068 results.append(s)
1069 except Exception as e:
1070 errors.append(e)
1071 raise
1072 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001073 with support.start_threads(threads):
1074 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001075 self.assertFalse(errors,
1076 "the following exceptions were caught: %r" % errors)
1077 s = b''.join(results)
1078 for i in range(256):
1079 c = bytes(bytearray([i]))
1080 self.assertEqual(s.count(c), N)
1081 finally:
1082 support.unlink(support.TESTFN)
1083
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001084 def test_unseekable(self):
1085 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1086 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1087 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1088 bufio.read(1)
1089 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1090 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_misbehaved_io(self):
1093 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1094 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001095 self.assertRaises(OSError, bufio.seek, 0)
1096 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001098 def test_no_extraneous_read(self):
1099 # Issue #9550; when the raw IO object has satisfied the read request,
1100 # we should not issue any additional reads, otherwise it may block
1101 # (e.g. socket).
1102 bufsize = 16
1103 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1104 rawio = self.MockRawIO([b"x" * n])
1105 bufio = self.tp(rawio, bufsize)
1106 self.assertEqual(bufio.read(n), b"x" * n)
1107 # Simple case: one raw read is enough to satisfy the request.
1108 self.assertEqual(rawio._extraneous_reads, 0,
1109 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1110 # A more complex case where two raw reads are needed to satisfy
1111 # the request.
1112 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1113 bufio = self.tp(rawio, bufsize)
1114 self.assertEqual(bufio.read(n), b"x" * n)
1115 self.assertEqual(rawio._extraneous_reads, 0,
1116 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1117
Berker Peksagea6d5592015-05-12 17:13:56 +03001118 def test_read_on_closed(self):
1119 # Issue #23796
1120 b = io.BufferedReader(io.BytesIO(b"12"))
1121 b.read(1)
1122 b.close()
1123 self.assertRaises(ValueError, b.peek)
1124 self.assertRaises(ValueError, b.read1, 1)
1125
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001126
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001127class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128 tp = io.BufferedReader
1129
1130 def test_constructor(self):
1131 BufferedReaderTest.test_constructor(self)
1132 # The allocation can succeed on 32-bit builds, e.g. with more
1133 # than 2GB RAM and a 64-bit kernel.
1134 if sys.maxsize > 0x7FFFFFFF:
1135 rawio = self.MockRawIO()
1136 bufio = self.tp(rawio)
1137 self.assertRaises((OverflowError, MemoryError, ValueError),
1138 bufio.__init__, rawio, sys.maxsize)
1139
1140 def test_initialization(self):
1141 rawio = self.MockRawIO([b"abc"])
1142 bufio = self.tp(rawio)
1143 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1144 self.assertRaises(ValueError, bufio.read)
1145 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1146 self.assertRaises(ValueError, bufio.read)
1147 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1148 self.assertRaises(ValueError, bufio.read)
1149
1150 def test_misbehaved_io_read(self):
1151 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1152 bufio = self.tp(rawio)
1153 # _pyio.BufferedReader seems to implement reading different, so that
1154 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156
1157 def test_garbage_collection(self):
1158 # C BufferedReader objects are collected.
1159 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001160 with support.check_warnings(('', ResourceWarning)):
1161 rawio = self.FileIO(support.TESTFN, "w+b")
1162 f = self.tp(rawio)
1163 f.f = f
1164 wr = weakref.ref(f)
1165 del f
1166 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001167 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168
R David Murray67bfe802013-02-23 21:51:05 -05001169 def test_args_error(self):
1170 # Issue #17275
1171 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1172 self.tp(io.BytesIO(), 1024, 1024, 1024)
1173
1174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175class PyBufferedReaderTest(BufferedReaderTest):
1176 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001177
Guido van Rossuma9e20242007-03-08 00:43:48 +00001178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1180 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 def test_constructor(self):
1183 rawio = self.MockRawIO()
1184 bufio = self.tp(rawio)
1185 bufio.__init__(rawio)
1186 bufio.__init__(rawio, buffer_size=1024)
1187 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001188 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 bufio.flush()
1190 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1191 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1192 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1193 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001194 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001196 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001198 def test_uninitialized(self):
1199 bufio = self.tp.__new__(self.tp)
1200 del bufio
1201 bufio = self.tp.__new__(self.tp)
1202 self.assertRaisesRegex((ValueError, AttributeError),
1203 'uninitialized|has no attribute',
1204 bufio.write, b'')
1205 bufio.__init__(self.MockRawIO())
1206 self.assertEqual(bufio.write(b''), 0)
1207
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001208 def test_detach_flush(self):
1209 raw = self.MockRawIO()
1210 buf = self.tp(raw)
1211 buf.write(b"howdy!")
1212 self.assertFalse(raw._write_stack)
1213 buf.detach()
1214 self.assertEqual(raw._write_stack, [b"howdy!"])
1215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001216 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001217 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218 writer = self.MockRawIO()
1219 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001220 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001221 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001223 def test_write_overflow(self):
1224 writer = self.MockRawIO()
1225 bufio = self.tp(writer, 8)
1226 contents = b"abcdefghijklmnop"
1227 for n in range(0, len(contents), 3):
1228 bufio.write(contents[n:n+3])
1229 flushed = b"".join(writer._write_stack)
1230 # At least (total - 8) bytes were implicitly flushed, perhaps more
1231 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001232 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001234 def check_writes(self, intermediate_func):
1235 # Lots of writes, test the flushed output is as expected.
1236 contents = bytes(range(256)) * 1000
1237 n = 0
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 13)
1240 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1241 def gen_sizes():
1242 for size in count(1):
1243 for i in range(15):
1244 yield size
1245 sizes = gen_sizes()
1246 while n < len(contents):
1247 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001248 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 intermediate_func(bufio)
1250 n += size
1251 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 def test_writes(self):
1255 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_writes_and_flushes(self):
1258 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001260 def test_writes_and_seeks(self):
1261 def _seekabs(bufio):
1262 pos = bufio.tell()
1263 bufio.seek(pos + 1, 0)
1264 bufio.seek(pos - 1, 0)
1265 bufio.seek(pos, 0)
1266 self.check_writes(_seekabs)
1267 def _seekrel(bufio):
1268 pos = bufio.seek(0, 1)
1269 bufio.seek(+1, 1)
1270 bufio.seek(-1, 1)
1271 bufio.seek(pos, 0)
1272 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_writes_and_truncates(self):
1275 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277 def test_write_non_blocking(self):
1278 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001279 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001280
Ezio Melottib3aedd42010-11-20 19:04:17 +00001281 self.assertEqual(bufio.write(b"abcd"), 4)
1282 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001283 # 1 byte will be written, the rest will be buffered
1284 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1288 raw.block_on(b"0")
1289 try:
1290 bufio.write(b"opqrwxyz0123456789")
1291 except self.BlockingIOError as e:
1292 written = e.characters_written
1293 else:
1294 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001295 self.assertEqual(written, 16)
1296 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001298
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 s = raw.pop_written()
1301 # Previously buffered bytes were flushed
1302 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001303
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 def test_write_and_rewind(self):
1305 raw = io.BytesIO()
1306 bufio = self.tp(raw, 4)
1307 self.assertEqual(bufio.write(b"abcdef"), 6)
1308 self.assertEqual(bufio.tell(), 6)
1309 bufio.seek(0, 0)
1310 self.assertEqual(bufio.write(b"XY"), 2)
1311 bufio.seek(6, 0)
1312 self.assertEqual(raw.getvalue(), b"XYcdef")
1313 self.assertEqual(bufio.write(b"123456"), 6)
1314 bufio.flush()
1315 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001317 def test_flush(self):
1318 writer = self.MockRawIO()
1319 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001320 bufio.write(b"abc")
1321 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001322 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001323
Antoine Pitrou131a4892012-10-16 22:57:11 +02001324 def test_writelines(self):
1325 l = [b'ab', b'cd', b'ef']
1326 writer = self.MockRawIO()
1327 bufio = self.tp(writer, 8)
1328 bufio.writelines(l)
1329 bufio.flush()
1330 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1331
1332 def test_writelines_userlist(self):
1333 l = UserList([b'ab', b'cd', b'ef'])
1334 writer = self.MockRawIO()
1335 bufio = self.tp(writer, 8)
1336 bufio.writelines(l)
1337 bufio.flush()
1338 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1339
1340 def test_writelines_error(self):
1341 writer = self.MockRawIO()
1342 bufio = self.tp(writer, 8)
1343 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1344 self.assertRaises(TypeError, bufio.writelines, None)
1345 self.assertRaises(TypeError, bufio.writelines, 'abc')
1346
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001347 def test_destructor(self):
1348 writer = self.MockRawIO()
1349 bufio = self.tp(writer, 8)
1350 bufio.write(b"abc")
1351 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001352 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001353 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001354
1355 def test_truncate(self):
1356 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001357 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001358 bufio = self.tp(raw, 8)
1359 bufio.write(b"abcdef")
1360 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001361 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001362 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 self.assertEqual(f.read(), b"abc")
1364
Victor Stinner45df8202010-04-28 22:31:17 +00001365 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001366 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001367 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001368 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 # Write out many bytes from many threads and test they were
1370 # all flushed.
1371 N = 1000
1372 contents = bytes(range(256)) * N
1373 sizes = cycle([1, 19])
1374 n = 0
1375 queue = deque()
1376 while n < len(contents):
1377 size = next(sizes)
1378 queue.append(contents[n:n+size])
1379 n += size
1380 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001381 # We use a real file object because it allows us to
1382 # exercise situations where the GIL is released before
1383 # writing the buffer to the raw streams. This is in addition
1384 # to concurrency issues due to switching threads in the middle
1385 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001386 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001387 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001388 errors = []
1389 def f():
1390 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391 while True:
1392 try:
1393 s = queue.popleft()
1394 except IndexError:
1395 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001396 bufio.write(s)
1397 except Exception as e:
1398 errors.append(e)
1399 raise
1400 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001401 with support.start_threads(threads):
1402 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001403 self.assertFalse(errors,
1404 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001406 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407 s = f.read()
1408 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001409 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001410 finally:
1411 support.unlink(support.TESTFN)
1412
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413 def test_misbehaved_io(self):
1414 rawio = self.MisbehavedRawIO()
1415 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001416 self.assertRaises(OSError, bufio.seek, 0)
1417 self.assertRaises(OSError, bufio.tell)
1418 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419
Florent Xicluna109d5732012-07-07 17:03:22 +02001420 def test_max_buffer_size_removal(self):
1421 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001422 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001423
Benjamin Peterson68623612012-12-20 11:53:11 -06001424 def test_write_error_on_close(self):
1425 raw = self.MockRawIO()
1426 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001427 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001428 raw.write = bad_write
1429 b = self.tp(raw)
1430 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001431 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001432 self.assertTrue(b.closed)
1433
Benjamin Peterson59406a92009-03-26 17:10:29 +00001434
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001435class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436 tp = io.BufferedWriter
1437
1438 def test_constructor(self):
1439 BufferedWriterTest.test_constructor(self)
1440 # The allocation can succeed on 32-bit builds, e.g. with more
1441 # than 2GB RAM and a 64-bit kernel.
1442 if sys.maxsize > 0x7FFFFFFF:
1443 rawio = self.MockRawIO()
1444 bufio = self.tp(rawio)
1445 self.assertRaises((OverflowError, MemoryError, ValueError),
1446 bufio.__init__, rawio, sys.maxsize)
1447
1448 def test_initialization(self):
1449 rawio = self.MockRawIO()
1450 bufio = self.tp(rawio)
1451 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1452 self.assertRaises(ValueError, bufio.write, b"def")
1453 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1454 self.assertRaises(ValueError, bufio.write, b"def")
1455 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1456 self.assertRaises(ValueError, bufio.write, b"def")
1457
1458 def test_garbage_collection(self):
1459 # C BufferedWriter objects are collected, and collecting them flushes
1460 # all data to disk.
1461 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001462 with support.check_warnings(('', ResourceWarning)):
1463 rawio = self.FileIO(support.TESTFN, "w+b")
1464 f = self.tp(rawio)
1465 f.write(b"123xxx")
1466 f.x = f
1467 wr = weakref.ref(f)
1468 del f
1469 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001470 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001471 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001472 self.assertEqual(f.read(), b"123xxx")
1473
R David Murray67bfe802013-02-23 21:51:05 -05001474 def test_args_error(self):
1475 # Issue #17275
1476 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1477 self.tp(io.BytesIO(), 1024, 1024, 1024)
1478
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479
1480class PyBufferedWriterTest(BufferedWriterTest):
1481 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001482
Guido van Rossum01a27522007-03-07 01:00:12 +00001483class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001484
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001485 def test_constructor(self):
1486 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001487 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001488
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001489 def test_uninitialized(self):
1490 pair = self.tp.__new__(self.tp)
1491 del pair
1492 pair = self.tp.__new__(self.tp)
1493 self.assertRaisesRegex((ValueError, AttributeError),
1494 'uninitialized|has no attribute',
1495 pair.read, 0)
1496 self.assertRaisesRegex((ValueError, AttributeError),
1497 'uninitialized|has no attribute',
1498 pair.write, b'')
1499 pair.__init__(self.MockRawIO(), self.MockRawIO())
1500 self.assertEqual(pair.read(0), b'')
1501 self.assertEqual(pair.write(b''), 0)
1502
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001503 def test_detach(self):
1504 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1505 self.assertRaises(self.UnsupportedOperation, pair.detach)
1506
Florent Xicluna109d5732012-07-07 17:03:22 +02001507 def test_constructor_max_buffer_size_removal(self):
1508 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001509 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001510
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001511 def test_constructor_with_not_readable(self):
1512 class NotReadable(MockRawIO):
1513 def readable(self):
1514 return False
1515
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001516 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001517
1518 def test_constructor_with_not_writeable(self):
1519 class NotWriteable(MockRawIO):
1520 def writable(self):
1521 return False
1522
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001523 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001524
1525 def test_read(self):
1526 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1527
1528 self.assertEqual(pair.read(3), b"abc")
1529 self.assertEqual(pair.read(1), b"d")
1530 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001531 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1532 self.assertEqual(pair.read(None), b"abc")
1533
1534 def test_readlines(self):
1535 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1536 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1537 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1538 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001539
1540 def test_read1(self):
1541 # .read1() is delegated to the underlying reader object, so this test
1542 # can be shallow.
1543 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1544
1545 self.assertEqual(pair.read1(3), b"abc")
1546
1547 def test_readinto(self):
1548 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1549
1550 data = bytearray(5)
1551 self.assertEqual(pair.readinto(data), 5)
1552 self.assertEqual(data, b"abcde")
1553
1554 def test_write(self):
1555 w = self.MockRawIO()
1556 pair = self.tp(self.MockRawIO(), w)
1557
1558 pair.write(b"abc")
1559 pair.flush()
1560 pair.write(b"def")
1561 pair.flush()
1562 self.assertEqual(w._write_stack, [b"abc", b"def"])
1563
1564 def test_peek(self):
1565 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1566
1567 self.assertTrue(pair.peek(3).startswith(b"abc"))
1568 self.assertEqual(pair.read(3), b"abc")
1569
1570 def test_readable(self):
1571 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1572 self.assertTrue(pair.readable())
1573
1574 def test_writeable(self):
1575 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1576 self.assertTrue(pair.writable())
1577
1578 def test_seekable(self):
1579 # BufferedRWPairs are never seekable, even if their readers and writers
1580 # are.
1581 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1582 self.assertFalse(pair.seekable())
1583
1584 # .flush() is delegated to the underlying writer object and has been
1585 # tested in the test_write method.
1586
1587 def test_close_and_closed(self):
1588 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1589 self.assertFalse(pair.closed)
1590 pair.close()
1591 self.assertTrue(pair.closed)
1592
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001593 def test_reader_close_error_on_close(self):
1594 def reader_close():
1595 reader_non_existing
1596 reader = self.MockRawIO()
1597 reader.close = reader_close
1598 writer = self.MockRawIO()
1599 pair = self.tp(reader, writer)
1600 with self.assertRaises(NameError) as err:
1601 pair.close()
1602 self.assertIn('reader_non_existing', str(err.exception))
1603 self.assertTrue(pair.closed)
1604 self.assertFalse(reader.closed)
1605 self.assertTrue(writer.closed)
1606
1607 def test_writer_close_error_on_close(self):
1608 def writer_close():
1609 writer_non_existing
1610 reader = self.MockRawIO()
1611 writer = self.MockRawIO()
1612 writer.close = writer_close
1613 pair = self.tp(reader, writer)
1614 with self.assertRaises(NameError) as err:
1615 pair.close()
1616 self.assertIn('writer_non_existing', str(err.exception))
1617 self.assertFalse(pair.closed)
1618 self.assertTrue(reader.closed)
1619 self.assertFalse(writer.closed)
1620
1621 def test_reader_writer_close_error_on_close(self):
1622 def reader_close():
1623 reader_non_existing
1624 def writer_close():
1625 writer_non_existing
1626 reader = self.MockRawIO()
1627 reader.close = reader_close
1628 writer = self.MockRawIO()
1629 writer.close = writer_close
1630 pair = self.tp(reader, writer)
1631 with self.assertRaises(NameError) as err:
1632 pair.close()
1633 self.assertIn('reader_non_existing', str(err.exception))
1634 self.assertIsInstance(err.exception.__context__, NameError)
1635 self.assertIn('writer_non_existing', str(err.exception.__context__))
1636 self.assertFalse(pair.closed)
1637 self.assertFalse(reader.closed)
1638 self.assertFalse(writer.closed)
1639
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001640 def test_isatty(self):
1641 class SelectableIsAtty(MockRawIO):
1642 def __init__(self, isatty):
1643 MockRawIO.__init__(self)
1644 self._isatty = isatty
1645
1646 def isatty(self):
1647 return self._isatty
1648
1649 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1650 self.assertFalse(pair.isatty())
1651
1652 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1653 self.assertTrue(pair.isatty())
1654
1655 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1656 self.assertTrue(pair.isatty())
1657
1658 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1659 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001660
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001661 def test_weakref_clearing(self):
1662 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1663 ref = weakref.ref(brw)
1664 brw = None
1665 ref = None # Shouldn't segfault.
1666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001667class CBufferedRWPairTest(BufferedRWPairTest):
1668 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670class PyBufferedRWPairTest(BufferedRWPairTest):
1671 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673
1674class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1675 read_mode = "rb+"
1676 write_mode = "wb+"
1677
1678 def test_constructor(self):
1679 BufferedReaderTest.test_constructor(self)
1680 BufferedWriterTest.test_constructor(self)
1681
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001682 def test_uninitialized(self):
1683 BufferedReaderTest.test_uninitialized(self)
1684 BufferedWriterTest.test_uninitialized(self)
1685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 def test_read_and_write(self):
1687 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001688 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001689
1690 self.assertEqual(b"as", rw.read(2))
1691 rw.write(b"ddd")
1692 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001693 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001695 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_seek_and_tell(self):
1698 raw = self.BytesIO(b"asdfghjkl")
1699 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001700
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(b"as", rw.read(2))
1702 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001703 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001704 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001705
Antoine Pitroue05565e2011-08-20 14:39:23 +02001706 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001707 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001708 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001709 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001710 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001711 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001712 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001713 self.assertEqual(7, rw.tell())
1714 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001715 rw.flush()
1716 self.assertEqual(b"asdf123fl", raw.getvalue())
1717
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001718 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 def check_flush_and_read(self, read_func):
1721 raw = self.BytesIO(b"abcdefghi")
1722 bufio = self.tp(raw)
1723
Ezio Melottib3aedd42010-11-20 19:04:17 +00001724 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001726 self.assertEqual(b"ef", read_func(bufio, 2))
1727 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001728 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001729 self.assertEqual(6, bufio.tell())
1730 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 raw.seek(0, 0)
1732 raw.write(b"XYZ")
1733 # flush() resets the read buffer
1734 bufio.flush()
1735 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001736 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737
1738 def test_flush_and_read(self):
1739 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1740
1741 def test_flush_and_readinto(self):
1742 def _readinto(bufio, n=-1):
1743 b = bytearray(n if n >= 0 else 9999)
1744 n = bufio.readinto(b)
1745 return bytes(b[:n])
1746 self.check_flush_and_read(_readinto)
1747
1748 def test_flush_and_peek(self):
1749 def _peek(bufio, n=-1):
1750 # This relies on the fact that the buffer can contain the whole
1751 # raw stream, otherwise peek() can return less.
1752 b = bufio.peek(n)
1753 if n != -1:
1754 b = b[:n]
1755 bufio.seek(len(b), 1)
1756 return b
1757 self.check_flush_and_read(_peek)
1758
1759 def test_flush_and_write(self):
1760 raw = self.BytesIO(b"abcdefghi")
1761 bufio = self.tp(raw)
1762
1763 bufio.write(b"123")
1764 bufio.flush()
1765 bufio.write(b"45")
1766 bufio.flush()
1767 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001768 self.assertEqual(b"12345fghi", raw.getvalue())
1769 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770
1771 def test_threads(self):
1772 BufferedReaderTest.test_threads(self)
1773 BufferedWriterTest.test_threads(self)
1774
1775 def test_writes_and_peek(self):
1776 def _peek(bufio):
1777 bufio.peek(1)
1778 self.check_writes(_peek)
1779 def _peek(bufio):
1780 pos = bufio.tell()
1781 bufio.seek(-1, 1)
1782 bufio.peek(1)
1783 bufio.seek(pos, 0)
1784 self.check_writes(_peek)
1785
1786 def test_writes_and_reads(self):
1787 def _read(bufio):
1788 bufio.seek(-1, 1)
1789 bufio.read(1)
1790 self.check_writes(_read)
1791
1792 def test_writes_and_read1s(self):
1793 def _read1(bufio):
1794 bufio.seek(-1, 1)
1795 bufio.read1(1)
1796 self.check_writes(_read1)
1797
1798 def test_writes_and_readintos(self):
1799 def _read(bufio):
1800 bufio.seek(-1, 1)
1801 bufio.readinto(bytearray(1))
1802 self.check_writes(_read)
1803
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001804 def test_write_after_readahead(self):
1805 # Issue #6629: writing after the buffer was filled by readahead should
1806 # first rewind the raw stream.
1807 for overwrite_size in [1, 5]:
1808 raw = self.BytesIO(b"A" * 10)
1809 bufio = self.tp(raw, 4)
1810 # Trigger readahead
1811 self.assertEqual(bufio.read(1), b"A")
1812 self.assertEqual(bufio.tell(), 1)
1813 # Overwriting should rewind the raw stream if it needs so
1814 bufio.write(b"B" * overwrite_size)
1815 self.assertEqual(bufio.tell(), overwrite_size + 1)
1816 # If the write size was smaller than the buffer size, flush() and
1817 # check that rewind happens.
1818 bufio.flush()
1819 self.assertEqual(bufio.tell(), overwrite_size + 1)
1820 s = raw.getvalue()
1821 self.assertEqual(s,
1822 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1823
Antoine Pitrou7c404892011-05-13 00:13:33 +02001824 def test_write_rewind_write(self):
1825 # Various combinations of reading / writing / seeking backwards / writing again
1826 def mutate(bufio, pos1, pos2):
1827 assert pos2 >= pos1
1828 # Fill the buffer
1829 bufio.seek(pos1)
1830 bufio.read(pos2 - pos1)
1831 bufio.write(b'\x02')
1832 # This writes earlier than the previous write, but still inside
1833 # the buffer.
1834 bufio.seek(pos1)
1835 bufio.write(b'\x01')
1836
1837 b = b"\x80\x81\x82\x83\x84"
1838 for i in range(0, len(b)):
1839 for j in range(i, len(b)):
1840 raw = self.BytesIO(b)
1841 bufio = self.tp(raw, 100)
1842 mutate(bufio, i, j)
1843 bufio.flush()
1844 expected = bytearray(b)
1845 expected[j] = 2
1846 expected[i] = 1
1847 self.assertEqual(raw.getvalue(), expected,
1848 "failed result for i=%d, j=%d" % (i, j))
1849
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001850 def test_truncate_after_read_or_write(self):
1851 raw = self.BytesIO(b"A" * 10)
1852 bufio = self.tp(raw, 100)
1853 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1854 self.assertEqual(bufio.truncate(), 2)
1855 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1856 self.assertEqual(bufio.truncate(), 4)
1857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 def test_misbehaved_io(self):
1859 BufferedReaderTest.test_misbehaved_io(self)
1860 BufferedWriterTest.test_misbehaved_io(self)
1861
Antoine Pitroue05565e2011-08-20 14:39:23 +02001862 def test_interleaved_read_write(self):
1863 # Test for issue #12213
1864 with self.BytesIO(b'abcdefgh') as raw:
1865 with self.tp(raw, 100) as f:
1866 f.write(b"1")
1867 self.assertEqual(f.read(1), b'b')
1868 f.write(b'2')
1869 self.assertEqual(f.read1(1), b'd')
1870 f.write(b'3')
1871 buf = bytearray(1)
1872 f.readinto(buf)
1873 self.assertEqual(buf, b'f')
1874 f.write(b'4')
1875 self.assertEqual(f.peek(1), b'h')
1876 f.flush()
1877 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1878
1879 with self.BytesIO(b'abc') as raw:
1880 with self.tp(raw, 100) as f:
1881 self.assertEqual(f.read(1), b'a')
1882 f.write(b"2")
1883 self.assertEqual(f.read(1), b'c')
1884 f.flush()
1885 self.assertEqual(raw.getvalue(), b'a2c')
1886
1887 def test_interleaved_readline_write(self):
1888 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1889 with self.tp(raw) as f:
1890 f.write(b'1')
1891 self.assertEqual(f.readline(), b'b\n')
1892 f.write(b'2')
1893 self.assertEqual(f.readline(), b'def\n')
1894 f.write(b'3')
1895 self.assertEqual(f.readline(), b'\n')
1896 f.flush()
1897 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1898
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001899 # You can't construct a BufferedRandom over a non-seekable stream.
1900 test_unseekable = None
1901
R David Murray67bfe802013-02-23 21:51:05 -05001902
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001903class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 tp = io.BufferedRandom
1905
1906 def test_constructor(self):
1907 BufferedRandomTest.test_constructor(self)
1908 # The allocation can succeed on 32-bit builds, e.g. with more
1909 # than 2GB RAM and a 64-bit kernel.
1910 if sys.maxsize > 0x7FFFFFFF:
1911 rawio = self.MockRawIO()
1912 bufio = self.tp(rawio)
1913 self.assertRaises((OverflowError, MemoryError, ValueError),
1914 bufio.__init__, rawio, sys.maxsize)
1915
1916 def test_garbage_collection(self):
1917 CBufferedReaderTest.test_garbage_collection(self)
1918 CBufferedWriterTest.test_garbage_collection(self)
1919
R David Murray67bfe802013-02-23 21:51:05 -05001920 def test_args_error(self):
1921 # Issue #17275
1922 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1923 self.tp(io.BytesIO(), 1024, 1024, 1024)
1924
1925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926class PyBufferedRandomTest(BufferedRandomTest):
1927 tp = pyio.BufferedRandom
1928
1929
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001930# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1931# properties:
1932# - A single output character can correspond to many bytes of input.
1933# - The number of input bytes to complete the character can be
1934# undetermined until the last input byte is received.
1935# - The number of input bytes can vary depending on previous input.
1936# - A single input byte can correspond to many characters of output.
1937# - The number of output characters can be undetermined until the
1938# last input byte is received.
1939# - The number of output characters can vary depending on previous input.
1940
1941class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1942 """
1943 For testing seek/tell behavior with a stateful, buffering decoder.
1944
1945 Input is a sequence of words. Words may be fixed-length (length set
1946 by input) or variable-length (period-terminated). In variable-length
1947 mode, extra periods are ignored. Possible words are:
1948 - 'i' followed by a number sets the input length, I (maximum 99).
1949 When I is set to 0, words are space-terminated.
1950 - 'o' followed by a number sets the output length, O (maximum 99).
1951 - Any other word is converted into a word followed by a period on
1952 the output. The output word consists of the input word truncated
1953 or padded out with hyphens to make its length equal to O. If O
1954 is 0, the word is output verbatim without truncating or padding.
1955 I and O are initially set to 1. When I changes, any buffered input is
1956 re-scanned according to the new I. EOF also terminates the last word.
1957 """
1958
1959 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001960 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001961 self.reset()
1962
1963 def __repr__(self):
1964 return '<SID %x>' % id(self)
1965
1966 def reset(self):
1967 self.i = 1
1968 self.o = 1
1969 self.buffer = bytearray()
1970
1971 def getstate(self):
1972 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1973 return bytes(self.buffer), i*100 + o
1974
1975 def setstate(self, state):
1976 buffer, io = state
1977 self.buffer = bytearray(buffer)
1978 i, o = divmod(io, 100)
1979 self.i, self.o = i ^ 1, o ^ 1
1980
1981 def decode(self, input, final=False):
1982 output = ''
1983 for b in input:
1984 if self.i == 0: # variable-length, terminated with period
1985 if b == ord('.'):
1986 if self.buffer:
1987 output += self.process_word()
1988 else:
1989 self.buffer.append(b)
1990 else: # fixed-length, terminate after self.i bytes
1991 self.buffer.append(b)
1992 if len(self.buffer) == self.i:
1993 output += self.process_word()
1994 if final and self.buffer: # EOF terminates the last word
1995 output += self.process_word()
1996 return output
1997
1998 def process_word(self):
1999 output = ''
2000 if self.buffer[0] == ord('i'):
2001 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2002 elif self.buffer[0] == ord('o'):
2003 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2004 else:
2005 output = self.buffer.decode('ascii')
2006 if len(output) < self.o:
2007 output += '-'*self.o # pad out with hyphens
2008 if self.o:
2009 output = output[:self.o] # truncate to output length
2010 output += '.'
2011 self.buffer = bytearray()
2012 return output
2013
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002014 codecEnabled = False
2015
2016 @classmethod
2017 def lookupTestDecoder(cls, name):
2018 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002019 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002020 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002021 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002022 incrementalencoder=None,
2023 streamreader=None, streamwriter=None,
2024 incrementaldecoder=cls)
2025
2026# Register the previous decoder for testing.
2027# Disabled by default, tests will enable it.
2028codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2029
2030
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002031class StatefulIncrementalDecoderTest(unittest.TestCase):
2032 """
2033 Make sure the StatefulIncrementalDecoder actually works.
2034 """
2035
2036 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002037 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002038 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002039 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002040 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002041 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002042 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002043 # I=0, O=6 (variable-length input, fixed-length output)
2044 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2045 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002046 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002047 # I=6, O=3 (fixed-length input > fixed-length output)
2048 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2049 # I=0, then 3; O=29, then 15 (with longer output)
2050 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2051 'a----------------------------.' +
2052 'b----------------------------.' +
2053 'cde--------------------------.' +
2054 'abcdefghijabcde.' +
2055 'a.b------------.' +
2056 '.c.------------.' +
2057 'd.e------------.' +
2058 'k--------------.' +
2059 'l--------------.' +
2060 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002061 ]
2062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002064 # Try a few one-shot test cases.
2065 for input, eof, output in self.test_cases:
2066 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002067 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002068
2069 # Also test an unfinished decode, followed by forcing EOF.
2070 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002071 self.assertEqual(d.decode(b'oiabcd'), '')
2072 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002073
2074class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002075
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002076 def setUp(self):
2077 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2078 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002079 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002080
Guido van Rossumd0712812007-04-11 16:32:43 +00002081 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002082 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084 def test_constructor(self):
2085 r = self.BytesIO(b"\xc3\xa9\n\n")
2086 b = self.BufferedReader(r, 1000)
2087 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002088 t.__init__(b, encoding="latin-1", newline="\r\n")
2089 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002091 t.__init__(b, encoding="utf-8", line_buffering=True)
2092 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002093 self.assertEqual(t.line_buffering, True)
2094 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 self.assertRaises(TypeError, t.__init__, b, newline=42)
2096 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2097
Serhiy Storchaka6a694662015-04-16 11:54:14 +03002098 def test_uninitialized(self):
2099 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2100 del t
2101 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2102 self.assertRaises(Exception, repr, t)
2103 self.assertRaisesRegex((ValueError, AttributeError),
2104 'uninitialized|has no attribute',
2105 t.read, 0)
2106 t.__init__(self.MockRawIO())
2107 self.assertEqual(t.read(0), '')
2108
Nick Coghlana9b15242014-02-04 22:11:18 +10002109 def test_non_text_encoding_codecs_are_rejected(self):
2110 # Ensure the constructor complains if passed a codec that isn't
2111 # marked as a text encoding
2112 # http://bugs.python.org/issue20404
2113 r = self.BytesIO()
2114 b = self.BufferedWriter(r)
2115 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2116 self.TextIOWrapper(b, encoding="hex")
2117
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002118 def test_detach(self):
2119 r = self.BytesIO()
2120 b = self.BufferedWriter(r)
2121 t = self.TextIOWrapper(b)
2122 self.assertIs(t.detach(), b)
2123
2124 t = self.TextIOWrapper(b, encoding="ascii")
2125 t.write("howdy")
2126 self.assertFalse(r.getvalue())
2127 t.detach()
2128 self.assertEqual(r.getvalue(), b"howdy")
2129 self.assertRaises(ValueError, t.detach)
2130
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002131 # Operations independent of the detached stream should still work
2132 repr(t)
2133 self.assertEqual(t.encoding, "ascii")
2134 self.assertEqual(t.errors, "strict")
2135 self.assertFalse(t.line_buffering)
2136
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002137 def test_repr(self):
2138 raw = self.BytesIO("hello".encode("utf-8"))
2139 b = self.BufferedReader(raw)
2140 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002141 modname = self.TextIOWrapper.__module__
2142 self.assertEqual(repr(t),
2143 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2144 raw.name = "dummy"
2145 self.assertEqual(repr(t),
2146 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002147 t.mode = "r"
2148 self.assertEqual(repr(t),
2149 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002150 raw.name = b"dummy"
2151 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002152 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002153
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002154 t.buffer.detach()
2155 repr(t) # Should not raise an exception
2156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 def test_line_buffering(self):
2158 r = self.BytesIO()
2159 b = self.BufferedWriter(r, 1000)
2160 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002161 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002162 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002163 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002164 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002165 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002167
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002168 def test_default_encoding(self):
2169 old_environ = dict(os.environ)
2170 try:
2171 # try to get a user preferred encoding different than the current
2172 # locale encoding to check that TextIOWrapper() uses the current
2173 # locale encoding and not the user preferred encoding
2174 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2175 if key in os.environ:
2176 del os.environ[key]
2177
2178 current_locale_encoding = locale.getpreferredencoding(False)
2179 b = self.BytesIO()
2180 t = self.TextIOWrapper(b)
2181 self.assertEqual(t.encoding, current_locale_encoding)
2182 finally:
2183 os.environ.clear()
2184 os.environ.update(old_environ)
2185
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002186 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002187 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002188 # Issue 15989
2189 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002190 b = self.BytesIO()
2191 b.fileno = lambda: _testcapi.INT_MAX + 1
2192 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2193 b.fileno = lambda: _testcapi.UINT_MAX + 1
2194 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 def test_encoding(self):
2197 # Check the encoding attribute is always set, and valid
2198 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002199 t = self.TextIOWrapper(b, encoding="utf-8")
2200 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002202 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203 codecs.lookup(t.encoding)
2204
2205 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002206 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002207 b = self.BytesIO(b"abc\n\xff\n")
2208 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002209 self.assertRaises(UnicodeError, t.read)
2210 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 b = self.BytesIO(b"abc\n\xff\n")
2212 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002213 self.assertRaises(UnicodeError, t.read)
2214 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 b = self.BytesIO(b"abc\n\xff\n")
2216 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002217 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002218 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002219 b = self.BytesIO(b"abc\n\xff\n")
2220 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002223 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002224 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 b = self.BytesIO()
2226 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002227 self.assertRaises(UnicodeError, t.write, "\xff")
2228 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002229 b = self.BytesIO()
2230 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002231 self.assertRaises(UnicodeError, t.write, "\xff")
2232 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 b = self.BytesIO()
2234 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002235 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002236 t.write("abc\xffdef\n")
2237 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002239 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240 b = self.BytesIO()
2241 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002242 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002243 t.write("abc\xffdef\n")
2244 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002247 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002248 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2249
2250 tests = [
2251 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002252 [ '', input_lines ],
2253 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2254 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2255 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002256 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002257 encodings = (
2258 'utf-8', 'latin-1',
2259 'utf-16', 'utf-16-le', 'utf-16-be',
2260 'utf-32', 'utf-32-le', 'utf-32-be',
2261 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002262
Guido van Rossum8358db22007-08-18 21:39:55 +00002263 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002264 # character in TextIOWrapper._pending_line.
2265 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002266 # XXX: str.encode() should return bytes
2267 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002268 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002269 for bufsize in range(1, 10):
2270 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2272 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002273 encoding=encoding)
2274 if do_reads:
2275 got_lines = []
2276 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002277 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002278 if c2 == '':
2279 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002280 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002281 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002282 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002283 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002284
2285 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(got_line, exp_line)
2287 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002289 def test_newlines_input(self):
2290 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002291 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2292 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002293 (None, normalized.decode("ascii").splitlines(keepends=True)),
2294 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2296 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2297 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002298 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 buf = self.BytesIO(testdata)
2300 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002302 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002303 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002305 def test_newlines_output(self):
2306 testdict = {
2307 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2308 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2309 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2310 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2311 }
2312 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2313 for newline, expected in tests:
2314 buf = self.BytesIO()
2315 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2316 txt.write("AAA\nB")
2317 txt.write("BB\nCCC\n")
2318 txt.write("X\rY\r\nZ")
2319 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002320 self.assertEqual(buf.closed, False)
2321 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002322
2323 def test_destructor(self):
2324 l = []
2325 base = self.BytesIO
2326 class MyBytesIO(base):
2327 def close(self):
2328 l.append(self.getvalue())
2329 base.close(self)
2330 b = MyBytesIO()
2331 t = self.TextIOWrapper(b, encoding="ascii")
2332 t.write("abc")
2333 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002334 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002335 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002336
2337 def test_override_destructor(self):
2338 record = []
2339 class MyTextIO(self.TextIOWrapper):
2340 def __del__(self):
2341 record.append(1)
2342 try:
2343 f = super().__del__
2344 except AttributeError:
2345 pass
2346 else:
2347 f()
2348 def close(self):
2349 record.append(2)
2350 super().close()
2351 def flush(self):
2352 record.append(3)
2353 super().flush()
2354 b = self.BytesIO()
2355 t = MyTextIO(b, encoding="ascii")
2356 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002357 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 self.assertEqual(record, [1, 2, 3])
2359
2360 def test_error_through_destructor(self):
2361 # Test that the exception state is not modified by a destructor,
2362 # even if close() fails.
2363 rawio = self.CloseFailureIO()
2364 def f():
2365 self.TextIOWrapper(rawio).xyzzy
2366 with support.captured_output("stderr") as s:
2367 self.assertRaises(AttributeError, f)
2368 s = s.getvalue().strip()
2369 if s:
2370 # The destructor *may* have printed an unraisable error, check it
2371 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002372 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002373 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002374
Guido van Rossum9b76da62007-04-11 01:09:03 +00002375 # Systematic tests of the text I/O API
2376
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002378 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002379 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002380 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002381 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002382 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002383 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002385 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002386 self.assertEqual(f.tell(), 0)
2387 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002388 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(f.seek(0), 0)
2390 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002391 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual(f.read(2), "ab")
2393 self.assertEqual(f.read(1), "c")
2394 self.assertEqual(f.read(1), "")
2395 self.assertEqual(f.read(), "")
2396 self.assertEqual(f.tell(), cookie)
2397 self.assertEqual(f.seek(0), 0)
2398 self.assertEqual(f.seek(0, 2), cookie)
2399 self.assertEqual(f.write("def"), 3)
2400 self.assertEqual(f.seek(cookie), cookie)
2401 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002402 if enc.startswith("utf"):
2403 self.multi_line_test(f, enc)
2404 f.close()
2405
2406 def multi_line_test(self, f, enc):
2407 f.seek(0)
2408 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002409 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002410 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002411 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002412 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002413 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002414 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002415 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002416 wlines.append((f.tell(), line))
2417 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002418 f.seek(0)
2419 rlines = []
2420 while True:
2421 pos = f.tell()
2422 line = f.readline()
2423 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002424 break
2425 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002426 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002428 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002429 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002430 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002431 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002432 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002433 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002434 p2 = f.tell()
2435 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002436 self.assertEqual(f.tell(), p0)
2437 self.assertEqual(f.readline(), "\xff\n")
2438 self.assertEqual(f.tell(), p1)
2439 self.assertEqual(f.readline(), "\xff\n")
2440 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002441 f.seek(0)
2442 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002444 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002446 f.close()
2447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 def test_seeking(self):
2449 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002450 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002451 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002452 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002453 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002454 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002455 suffix = bytes(u_suffix.encode("utf-8"))
2456 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002457 with self.open(support.TESTFN, "wb") as f:
2458 f.write(line*2)
2459 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2460 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(s, str(prefix, "ascii"))
2462 self.assertEqual(f.tell(), prefix_size)
2463 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002464
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002465 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002466 # Regression test for a specific bug
2467 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002468 with self.open(support.TESTFN, "wb") as f:
2469 f.write(data)
2470 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2471 f._CHUNK_SIZE # Just test that it exists
2472 f._CHUNK_SIZE = 2
2473 f.readline()
2474 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002475
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 def test_seek_and_tell(self):
2477 #Test seek/tell using the StatefulIncrementalDecoder.
2478 # Make test faster by doing smaller seeks
2479 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002480
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002481 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002482 """Tell/seek to various points within a data stream and ensure
2483 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002484 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002485 f.write(data)
2486 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002487 f = self.open(support.TESTFN, encoding='test_decoder')
2488 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002489 decoded = f.read()
2490 f.close()
2491
Neal Norwitze2b07052008-03-18 19:52:05 +00002492 for i in range(min_pos, len(decoded) + 1): # seek positions
2493 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002494 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002495 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002496 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002497 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002498 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002499 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002500 f.close()
2501
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002502 # Enable the test decoder.
2503 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002504
2505 # Run the tests.
2506 try:
2507 # Try each test case.
2508 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002509 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002510
2511 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002512 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2513 offset = CHUNK_SIZE - len(input)//2
2514 prefix = b'.'*offset
2515 # Don't bother seeking into the prefix (takes too long).
2516 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002517 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002518
2519 # Ensure our test decoder won't interfere with subsequent tests.
2520 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002521 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002522
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002523 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002524 data = "1234567890"
2525 tests = ("utf-16",
2526 "utf-16-le",
2527 "utf-16-be",
2528 "utf-32",
2529 "utf-32-le",
2530 "utf-32-be")
2531 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002532 buf = self.BytesIO()
2533 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002534 # Check if the BOM is written only once (see issue1753).
2535 f.write(data)
2536 f.write(data)
2537 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002538 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002539 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(f.read(), data * 2)
2541 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002542
Benjamin Petersona1b49012009-03-31 23:11:32 +00002543 def test_unreadable(self):
2544 class UnReadable(self.BytesIO):
2545 def readable(self):
2546 return False
2547 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002548 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550 def test_read_one_by_one(self):
2551 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002552 reads = ""
2553 while True:
2554 c = txt.read(1)
2555 if not c:
2556 break
2557 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002558 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002559
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002560 def test_readlines(self):
2561 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2562 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2563 txt.seek(0)
2564 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2565 txt.seek(0)
2566 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2567
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002568 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002570 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002572 reads = ""
2573 while True:
2574 c = txt.read(128)
2575 if not c:
2576 break
2577 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002578 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002579
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002580 def test_writelines(self):
2581 l = ['ab', 'cd', 'ef']
2582 buf = self.BytesIO()
2583 txt = self.TextIOWrapper(buf)
2584 txt.writelines(l)
2585 txt.flush()
2586 self.assertEqual(buf.getvalue(), b'abcdef')
2587
2588 def test_writelines_userlist(self):
2589 l = UserList(['ab', 'cd', 'ef'])
2590 buf = self.BytesIO()
2591 txt = self.TextIOWrapper(buf)
2592 txt.writelines(l)
2593 txt.flush()
2594 self.assertEqual(buf.getvalue(), b'abcdef')
2595
2596 def test_writelines_error(self):
2597 txt = self.TextIOWrapper(self.BytesIO())
2598 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2599 self.assertRaises(TypeError, txt.writelines, None)
2600 self.assertRaises(TypeError, txt.writelines, b'abc')
2601
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002602 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002603 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002604
2605 # read one char at a time
2606 reads = ""
2607 while True:
2608 c = txt.read(1)
2609 if not c:
2610 break
2611 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002612 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002613
2614 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002615 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002616 txt._CHUNK_SIZE = 4
2617
2618 reads = ""
2619 while True:
2620 c = txt.read(4)
2621 if not c:
2622 break
2623 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002625
2626 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002627 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002628 txt._CHUNK_SIZE = 4
2629
2630 reads = txt.read(4)
2631 reads += txt.read(4)
2632 reads += txt.readline()
2633 reads += txt.readline()
2634 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002635 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002636
2637 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002638 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002639 txt._CHUNK_SIZE = 4
2640
2641 reads = txt.read(4)
2642 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002643 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002644
2645 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002646 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002647 txt._CHUNK_SIZE = 4
2648
2649 reads = txt.read(4)
2650 pos = txt.tell()
2651 txt.seek(0)
2652 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002653 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002654
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002655 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656 buffer = self.BytesIO(self.testdata)
2657 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002658
2659 self.assertEqual(buffer.seekable(), txt.seekable())
2660
Antoine Pitroue4501852009-05-14 18:55:55 +00002661 def test_append_bom(self):
2662 # The BOM is not written again when appending to a non-empty file
2663 filename = support.TESTFN
2664 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2665 with self.open(filename, 'w', encoding=charset) as f:
2666 f.write('aaa')
2667 pos = f.tell()
2668 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002669 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002670
2671 with self.open(filename, 'a', encoding=charset) as f:
2672 f.write('xxx')
2673 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002674 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002675
2676 def test_seek_bom(self):
2677 # Same test, but when seeking manually
2678 filename = support.TESTFN
2679 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2680 with self.open(filename, 'w', encoding=charset) as f:
2681 f.write('aaa')
2682 pos = f.tell()
2683 with self.open(filename, 'r+', encoding=charset) as f:
2684 f.seek(pos)
2685 f.write('zzz')
2686 f.seek(0)
2687 f.write('bbb')
2688 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002689 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002690
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002691 def test_seek_append_bom(self):
2692 # Same test, but first seek to the start and then to the end
2693 filename = support.TESTFN
2694 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2695 with self.open(filename, 'w', encoding=charset) as f:
2696 f.write('aaa')
2697 with self.open(filename, 'a', encoding=charset) as f:
2698 f.seek(0)
2699 f.seek(0, self.SEEK_END)
2700 f.write('xxx')
2701 with self.open(filename, 'rb') as f:
2702 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2703
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002704 def test_errors_property(self):
2705 with self.open(support.TESTFN, "w") as f:
2706 self.assertEqual(f.errors, "strict")
2707 with self.open(support.TESTFN, "w", errors="replace") as f:
2708 self.assertEqual(f.errors, "replace")
2709
Brett Cannon31f59292011-02-21 19:29:56 +00002710 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002711 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002712 def test_threads_write(self):
2713 # Issue6750: concurrent writes could duplicate data
2714 event = threading.Event()
2715 with self.open(support.TESTFN, "w", buffering=1) as f:
2716 def run(n):
2717 text = "Thread%03d\n" % n
2718 event.wait()
2719 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002720 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002721 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002722 with support.start_threads(threads, event.set):
2723 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002724 with self.open(support.TESTFN) as f:
2725 content = f.read()
2726 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002728
Antoine Pitrou6be88762010-05-03 16:48:20 +00002729 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002730 # Test that text file is closed despite failed flush
2731 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002732 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002733 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002734 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002735 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002736 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002737 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002738 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002739 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002740 self.assertTrue(txt.buffer.closed)
2741 self.assertTrue(closed) # flush() called
2742 self.assertFalse(closed[0]) # flush() called before file closed
2743 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002744 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002745
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002746 def test_close_error_on_close(self):
2747 buffer = self.BytesIO(self.testdata)
2748 def bad_flush():
2749 raise OSError('flush')
2750 def bad_close():
2751 raise OSError('close')
2752 buffer.close = bad_close
2753 txt = self.TextIOWrapper(buffer, encoding="ascii")
2754 txt.flush = bad_flush
2755 with self.assertRaises(OSError) as err: # exception not swallowed
2756 txt.close()
2757 self.assertEqual(err.exception.args, ('close',))
2758 self.assertIsInstance(err.exception.__context__, OSError)
2759 self.assertEqual(err.exception.__context__.args, ('flush',))
2760 self.assertFalse(txt.closed)
2761
2762 def test_nonnormalized_close_error_on_close(self):
2763 # Issue #21677
2764 buffer = self.BytesIO(self.testdata)
2765 def bad_flush():
2766 raise non_existing_flush
2767 def bad_close():
2768 raise non_existing_close
2769 buffer.close = bad_close
2770 txt = self.TextIOWrapper(buffer, encoding="ascii")
2771 txt.flush = bad_flush
2772 with self.assertRaises(NameError) as err: # exception not swallowed
2773 txt.close()
2774 self.assertIn('non_existing_close', str(err.exception))
2775 self.assertIsInstance(err.exception.__context__, NameError)
2776 self.assertIn('non_existing_flush', str(err.exception.__context__))
2777 self.assertFalse(txt.closed)
2778
Antoine Pitrou6be88762010-05-03 16:48:20 +00002779 def test_multi_close(self):
2780 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2781 txt.close()
2782 txt.close()
2783 txt.close()
2784 self.assertRaises(ValueError, txt.flush)
2785
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002786 def test_unseekable(self):
2787 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2788 self.assertRaises(self.UnsupportedOperation, txt.tell)
2789 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2790
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002791 def test_readonly_attributes(self):
2792 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2793 buf = self.BytesIO(self.testdata)
2794 with self.assertRaises(AttributeError):
2795 txt.buffer = buf
2796
Antoine Pitroue96ec682011-07-23 21:46:35 +02002797 def test_rawio(self):
2798 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2799 # that subprocess.Popen() can have the required unbuffered
2800 # semantics with universal_newlines=True.
2801 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2802 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2803 # Reads
2804 self.assertEqual(txt.read(4), 'abcd')
2805 self.assertEqual(txt.readline(), 'efghi\n')
2806 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2807
2808 def test_rawio_write_through(self):
2809 # Issue #12591: with write_through=True, writes don't need a flush
2810 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2811 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2812 write_through=True)
2813 txt.write('1')
2814 txt.write('23\n4')
2815 txt.write('5')
2816 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2817
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002818 def test_bufio_write_through(self):
2819 # Issue #21396: write_through=True doesn't force a flush()
2820 # on the underlying binary buffered object.
2821 flush_called, write_called = [], []
2822 class BufferedWriter(self.BufferedWriter):
2823 def flush(self, *args, **kwargs):
2824 flush_called.append(True)
2825 return super().flush(*args, **kwargs)
2826 def write(self, *args, **kwargs):
2827 write_called.append(True)
2828 return super().write(*args, **kwargs)
2829
2830 rawio = self.BytesIO()
2831 data = b"a"
2832 bufio = BufferedWriter(rawio, len(data)*2)
2833 textio = self.TextIOWrapper(bufio, encoding='ascii',
2834 write_through=True)
2835 # write to the buffered io but don't overflow the buffer
2836 text = data.decode('ascii')
2837 textio.write(text)
2838
2839 # buffer.flush is not called with write_through=True
2840 self.assertFalse(flush_called)
2841 # buffer.write *is* called with write_through=True
2842 self.assertTrue(write_called)
2843 self.assertEqual(rawio.getvalue(), b"") # no flush
2844
2845 write_called = [] # reset
2846 textio.write(text * 10) # total content is larger than bufio buffer
2847 self.assertTrue(write_called)
2848 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2849
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002850 def test_read_nonbytes(self):
2851 # Issue #17106
2852 # Crash when underlying read() returns non-bytes
2853 t = self.TextIOWrapper(self.StringIO('a'))
2854 self.assertRaises(TypeError, t.read, 1)
2855 t = self.TextIOWrapper(self.StringIO('a'))
2856 self.assertRaises(TypeError, t.readline)
2857 t = self.TextIOWrapper(self.StringIO('a'))
2858 self.assertRaises(TypeError, t.read)
2859
2860 def test_illegal_decoder(self):
2861 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002862 # Bypass the early encoding check added in issue 20404
2863 def _make_illegal_wrapper():
2864 quopri = codecs.lookup("quopri")
2865 quopri._is_text_encoding = True
2866 try:
2867 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2868 newline='\n', encoding="quopri")
2869 finally:
2870 quopri._is_text_encoding = False
2871 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002872 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002873 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002874 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002875 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002876 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002877 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002878 self.assertRaises(TypeError, t.read)
2879
Antoine Pitrou712cb732013-12-21 15:51:54 +01002880 def _check_create_at_shutdown(self, **kwargs):
2881 # Issue #20037: creating a TextIOWrapper at shutdown
2882 # shouldn't crash the interpreter.
2883 iomod = self.io.__name__
2884 code = """if 1:
2885 import codecs
2886 import {iomod} as io
2887
2888 # Avoid looking up codecs at shutdown
2889 codecs.lookup('utf-8')
2890
2891 class C:
2892 def __init__(self):
2893 self.buf = io.BytesIO()
2894 def __del__(self):
2895 io.TextIOWrapper(self.buf, **{kwargs})
2896 print("ok")
2897 c = C()
2898 """.format(iomod=iomod, kwargs=kwargs)
2899 return assert_python_ok("-c", code)
2900
2901 def test_create_at_shutdown_without_encoding(self):
2902 rc, out, err = self._check_create_at_shutdown()
2903 if err:
2904 # Can error out with a RuntimeError if the module state
2905 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002906 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002907 else:
2908 self.assertEqual("ok", out.decode().strip())
2909
2910 def test_create_at_shutdown_with_encoding(self):
2911 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2912 errors='strict')
2913 self.assertFalse(err)
2914 self.assertEqual("ok", out.decode().strip())
2915
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002916 def test_issue22849(self):
2917 class F(object):
2918 def readable(self): return True
2919 def writable(self): return True
2920 def seekable(self): return True
2921
2922 for i in range(10):
2923 try:
2924 self.TextIOWrapper(F(), encoding='utf-8')
2925 except Exception:
2926 pass
2927
2928 F.tell = lambda x: 0
2929 t = self.TextIOWrapper(F(), encoding='utf-8')
2930
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002932class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002933 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002934 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002935
2936 def test_initialization(self):
2937 r = self.BytesIO(b"\xc3\xa9\n\n")
2938 b = self.BufferedReader(r, 1000)
2939 t = self.TextIOWrapper(b)
2940 self.assertRaises(TypeError, t.__init__, b, newline=42)
2941 self.assertRaises(ValueError, t.read)
2942 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2943 self.assertRaises(ValueError, t.read)
2944
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002945 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2946 self.assertRaises(Exception, repr, t)
2947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002948 def test_garbage_collection(self):
2949 # C TextIOWrapper objects are collected, and collecting them flushes
2950 # all data to disk.
2951 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002952 with support.check_warnings(('', ResourceWarning)):
2953 rawio = io.FileIO(support.TESTFN, "wb")
2954 b = self.BufferedWriter(rawio)
2955 t = self.TextIOWrapper(b, encoding="ascii")
2956 t.write("456def")
2957 t.x = t
2958 wr = weakref.ref(t)
2959 del t
2960 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002961 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002962 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002963 self.assertEqual(f.read(), b"456def")
2964
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002965 def test_rwpair_cleared_before_textio(self):
2966 # Issue 13070: TextIOWrapper's finalization would crash when called
2967 # after the reference to the underlying BufferedRWPair's writer got
2968 # cleared by the GC.
2969 for i in range(1000):
2970 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2971 t1 = self.TextIOWrapper(b1, encoding="ascii")
2972 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2973 t2 = self.TextIOWrapper(b2, encoding="ascii")
2974 # circular references
2975 t1.buddy = t2
2976 t2.buddy = t1
2977 support.gc_collect()
2978
2979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002980class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002981 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002982 #shutdown_error = "LookupError: unknown encoding: ascii"
2983 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984
2985
2986class IncrementalNewlineDecoderTest(unittest.TestCase):
2987
2988 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002989 # UTF-8 specific tests for a newline decoder
2990 def _check_decode(b, s, **kwargs):
2991 # We exercise getstate() / setstate() as well as decode()
2992 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002993 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002994 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002995 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002996
Antoine Pitrou180a3362008-12-14 16:36:46 +00002997 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002998
Antoine Pitrou180a3362008-12-14 16:36:46 +00002999 _check_decode(b'\xe8', "")
3000 _check_decode(b'\xa2', "")
3001 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003002
Antoine Pitrou180a3362008-12-14 16:36:46 +00003003 _check_decode(b'\xe8', "")
3004 _check_decode(b'\xa2', "")
3005 _check_decode(b'\x88', "\u8888")
3006
3007 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003008 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3009
Antoine Pitrou180a3362008-12-14 16:36:46 +00003010 decoder.reset()
3011 _check_decode(b'\n', "\n")
3012 _check_decode(b'\r', "")
3013 _check_decode(b'', "\n", final=True)
3014 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003015
Antoine Pitrou180a3362008-12-14 16:36:46 +00003016 _check_decode(b'\r', "")
3017 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003018
Antoine Pitrou180a3362008-12-14 16:36:46 +00003019 _check_decode(b'\r\r\n', "\n\n")
3020 _check_decode(b'\r', "")
3021 _check_decode(b'\r', "\n")
3022 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003023
Antoine Pitrou180a3362008-12-14 16:36:46 +00003024 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3025 _check_decode(b'\xe8\xa2\x88', "\u8888")
3026 _check_decode(b'\n', "\n")
3027 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3028 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003030 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003031 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003032 if encoding is not None:
3033 encoder = codecs.getincrementalencoder(encoding)()
3034 def _decode_bytewise(s):
3035 # Decode one byte at a time
3036 for b in encoder.encode(s):
3037 result.append(decoder.decode(bytes([b])))
3038 else:
3039 encoder = None
3040 def _decode_bytewise(s):
3041 # Decode one char at a time
3042 for c in s:
3043 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003044 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003045 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003046 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003047 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003048 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003049 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003050 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003051 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003052 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003053 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003054 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003055 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003056 input = "abc"
3057 if encoder is not None:
3058 encoder.reset()
3059 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003060 self.assertEqual(decoder.decode(input), "abc")
3061 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003062
3063 def test_newline_decoder(self):
3064 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003065 # None meaning the IncrementalNewlineDecoder takes unicode input
3066 # rather than bytes input
3067 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003068 'utf-16', 'utf-16-le', 'utf-16-be',
3069 'utf-32', 'utf-32-le', 'utf-32-be',
3070 )
3071 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003072 decoder = enc and codecs.getincrementaldecoder(enc)()
3073 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3074 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003075 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003076 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3077 self.check_newline_decoding_utf8(decoder)
3078
Antoine Pitrou66913e22009-03-06 23:40:56 +00003079 def test_newline_bytes(self):
3080 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3081 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003082 self.assertEqual(dec.newlines, None)
3083 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3084 self.assertEqual(dec.newlines, None)
3085 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3086 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003087 dec = self.IncrementalNewlineDecoder(None, translate=False)
3088 _check(dec)
3089 dec = self.IncrementalNewlineDecoder(None, translate=True)
3090 _check(dec)
3091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003092class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3093 pass
3094
3095class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3096 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003097
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003098
Guido van Rossum01a27522007-03-07 01:00:12 +00003099# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003100
Guido van Rossum5abbf752007-08-27 17:39:33 +00003101class MiscIOTest(unittest.TestCase):
3102
Barry Warsaw40e82462008-11-20 20:14:50 +00003103 def tearDown(self):
3104 support.unlink(support.TESTFN)
3105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003106 def test___all__(self):
3107 for name in self.io.__all__:
3108 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003109 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003110 if name == "open":
3111 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003112 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003113 self.assertTrue(issubclass(obj, Exception), name)
3114 elif not name.startswith("SEEK_"):
3115 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003116
Barry Warsaw40e82462008-11-20 20:14:50 +00003117 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003118 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003119 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003120 f.close()
3121
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003122 with support.check_warnings(('', DeprecationWarning)):
3123 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003124 self.assertEqual(f.name, support.TESTFN)
3125 self.assertEqual(f.buffer.name, support.TESTFN)
3126 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3127 self.assertEqual(f.mode, "U")
3128 self.assertEqual(f.buffer.mode, "rb")
3129 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003130 f.close()
3131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003132 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003133 self.assertEqual(f.mode, "w+")
3134 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3135 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003137 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003138 self.assertEqual(g.mode, "wb")
3139 self.assertEqual(g.raw.mode, "wb")
3140 self.assertEqual(g.name, f.fileno())
3141 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003142 f.close()
3143 g.close()
3144
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003145 def test_io_after_close(self):
3146 for kwargs in [
3147 {"mode": "w"},
3148 {"mode": "wb"},
3149 {"mode": "w", "buffering": 1},
3150 {"mode": "w", "buffering": 2},
3151 {"mode": "wb", "buffering": 0},
3152 {"mode": "r"},
3153 {"mode": "rb"},
3154 {"mode": "r", "buffering": 1},
3155 {"mode": "r", "buffering": 2},
3156 {"mode": "rb", "buffering": 0},
3157 {"mode": "w+"},
3158 {"mode": "w+b"},
3159 {"mode": "w+", "buffering": 1},
3160 {"mode": "w+", "buffering": 2},
3161 {"mode": "w+b", "buffering": 0},
3162 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003163 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003164 f.close()
3165 self.assertRaises(ValueError, f.flush)
3166 self.assertRaises(ValueError, f.fileno)
3167 self.assertRaises(ValueError, f.isatty)
3168 self.assertRaises(ValueError, f.__iter__)
3169 if hasattr(f, "peek"):
3170 self.assertRaises(ValueError, f.peek, 1)
3171 self.assertRaises(ValueError, f.read)
3172 if hasattr(f, "read1"):
3173 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003174 if hasattr(f, "readall"):
3175 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003176 if hasattr(f, "readinto"):
3177 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3178 self.assertRaises(ValueError, f.readline)
3179 self.assertRaises(ValueError, f.readlines)
3180 self.assertRaises(ValueError, f.seek, 0)
3181 self.assertRaises(ValueError, f.tell)
3182 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003183 self.assertRaises(ValueError, f.write,
3184 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003185 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003188 def test_blockingioerror(self):
3189 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003190 class C(str):
3191 pass
3192 c = C("")
3193 b = self.BlockingIOError(1, c)
3194 c.b = b
3195 b.c = c
3196 wr = weakref.ref(c)
3197 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003198 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003199 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003200
3201 def test_abcs(self):
3202 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003203 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3204 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3205 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3206 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003207
3208 def _check_abc_inheritance(self, abcmodule):
3209 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003210 self.assertIsInstance(f, abcmodule.IOBase)
3211 self.assertIsInstance(f, abcmodule.RawIOBase)
3212 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3213 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003214 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003215 self.assertIsInstance(f, abcmodule.IOBase)
3216 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3217 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3218 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003219 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003220 self.assertIsInstance(f, abcmodule.IOBase)
3221 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3222 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3223 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003224
3225 def test_abc_inheritance(self):
3226 # Test implementations inherit from their respective ABCs
3227 self._check_abc_inheritance(self)
3228
3229 def test_abc_inheritance_official(self):
3230 # Test implementations inherit from the official ABCs of the
3231 # baseline "io" module.
3232 self._check_abc_inheritance(io)
3233
Antoine Pitroue033e062010-10-29 10:38:18 +00003234 def _check_warn_on_dealloc(self, *args, **kwargs):
3235 f = open(*args, **kwargs)
3236 r = repr(f)
3237 with self.assertWarns(ResourceWarning) as cm:
3238 f = None
3239 support.gc_collect()
3240 self.assertIn(r, str(cm.warning.args[0]))
3241
3242 def test_warn_on_dealloc(self):
3243 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3244 self._check_warn_on_dealloc(support.TESTFN, "wb")
3245 self._check_warn_on_dealloc(support.TESTFN, "w")
3246
3247 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3248 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003249 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003250 for fd in fds:
3251 try:
3252 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003253 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003254 if e.errno != errno.EBADF:
3255 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003256 self.addCleanup(cleanup_fds)
3257 r, w = os.pipe()
3258 fds += r, w
3259 self._check_warn_on_dealloc(r, *args, **kwargs)
3260 # When using closefd=False, there's no warning
3261 r, w = os.pipe()
3262 fds += r, w
3263 with warnings.catch_warnings(record=True) as recorded:
3264 open(r, *args, closefd=False, **kwargs)
3265 support.gc_collect()
3266 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003267
3268 def test_warn_on_dealloc_fd(self):
3269 self._check_warn_on_dealloc_fd("rb", buffering=0)
3270 self._check_warn_on_dealloc_fd("rb")
3271 self._check_warn_on_dealloc_fd("r")
3272
3273
Antoine Pitrou243757e2010-11-05 21:15:39 +00003274 def test_pickling(self):
3275 # Pickling file objects is forbidden
3276 for kwargs in [
3277 {"mode": "w"},
3278 {"mode": "wb"},
3279 {"mode": "wb", "buffering": 0},
3280 {"mode": "r"},
3281 {"mode": "rb"},
3282 {"mode": "rb", "buffering": 0},
3283 {"mode": "w+"},
3284 {"mode": "w+b"},
3285 {"mode": "w+b", "buffering": 0},
3286 ]:
3287 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3288 with self.open(support.TESTFN, **kwargs) as f:
3289 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3290
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003291 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3292 def test_nonblock_pipe_write_bigbuf(self):
3293 self._test_nonblock_pipe_write(16*1024)
3294
3295 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3296 def test_nonblock_pipe_write_smallbuf(self):
3297 self._test_nonblock_pipe_write(1024)
3298
3299 def _set_non_blocking(self, fd):
3300 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3301 self.assertNotEqual(flags, -1)
3302 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3303 self.assertEqual(res, 0)
3304
3305 def _test_nonblock_pipe_write(self, bufsize):
3306 sent = []
3307 received = []
3308 r, w = os.pipe()
3309 self._set_non_blocking(r)
3310 self._set_non_blocking(w)
3311
3312 # To exercise all code paths in the C implementation we need
3313 # to play with buffer sizes. For instance, if we choose a
3314 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3315 # then we will never get a partial write of the buffer.
3316 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3317 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3318
3319 with rf, wf:
3320 for N in 9999, 73, 7574:
3321 try:
3322 i = 0
3323 while True:
3324 msg = bytes([i % 26 + 97]) * N
3325 sent.append(msg)
3326 wf.write(msg)
3327 i += 1
3328
3329 except self.BlockingIOError as e:
3330 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003331 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003332 sent[-1] = sent[-1][:e.characters_written]
3333 received.append(rf.read())
3334 msg = b'BLOCKED'
3335 wf.write(msg)
3336 sent.append(msg)
3337
3338 while True:
3339 try:
3340 wf.flush()
3341 break
3342 except self.BlockingIOError as e:
3343 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003344 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003345 self.assertEqual(e.characters_written, 0)
3346 received.append(rf.read())
3347
3348 received += iter(rf.read, None)
3349
3350 sent, received = b''.join(sent), b''.join(received)
3351 self.assertTrue(sent == received)
3352 self.assertTrue(wf.closed)
3353 self.assertTrue(rf.closed)
3354
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003355 def test_create_fail(self):
3356 # 'x' mode fails if file is existing
3357 with self.open(support.TESTFN, 'w'):
3358 pass
3359 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3360
3361 def test_create_writes(self):
3362 # 'x' mode opens for writing
3363 with self.open(support.TESTFN, 'xb') as f:
3364 f.write(b"spam")
3365 with self.open(support.TESTFN, 'rb') as f:
3366 self.assertEqual(b"spam", f.read())
3367
Christian Heimes7b648752012-09-10 14:48:43 +02003368 def test_open_allargs(self):
3369 # there used to be a buffer overflow in the parser for rawmode
3370 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3371
3372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003373class CMiscIOTest(MiscIOTest):
3374 io = io
3375
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003376 def test_readinto_buffer_overflow(self):
3377 # Issue #18025
3378 class BadReader(self.io.BufferedIOBase):
3379 def read(self, n=-1):
3380 return b'x' * 10**6
3381 bufio = BadReader()
3382 b = bytearray(2)
3383 self.assertRaises(ValueError, bufio.readinto, b)
3384
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003385 @unittest.skipUnless(threading, 'Threading required for this test.')
3386 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3387 # Issue #23309: deadlocks at shutdown should be avoided when a
3388 # daemon thread and the main thread both write to a file.
3389 code = """if 1:
3390 import sys
3391 import time
3392 import threading
3393
3394 file = sys.{stream_name}
3395
3396 def run():
3397 while True:
3398 file.write('.')
3399 file.flush()
3400
3401 thread = threading.Thread(target=run)
3402 thread.daemon = True
3403 thread.start()
3404
3405 time.sleep(0.5)
3406 file.write('!')
3407 file.flush()
3408 """.format_map(locals())
3409 res, _ = run_python_until_end("-c", code)
3410 err = res.err.decode()
3411 if res.rc != 0:
3412 # Failure: should be a fatal error
3413 self.assertIn("Fatal Python error: could not acquire lock "
3414 "for <_io.BufferedWriter name='<{stream_name}>'> "
3415 "at interpreter shutdown, possibly due to "
3416 "daemon threads".format_map(locals()),
3417 err)
3418 else:
3419 self.assertFalse(err.strip('.!'))
3420
3421 def test_daemon_threads_shutdown_stdout_deadlock(self):
3422 self.check_daemon_threads_shutdown_deadlock('stdout')
3423
3424 def test_daemon_threads_shutdown_stderr_deadlock(self):
3425 self.check_daemon_threads_shutdown_deadlock('stderr')
3426
3427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003428class PyMiscIOTest(MiscIOTest):
3429 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003430
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003431
3432@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3433class SignalsTest(unittest.TestCase):
3434
3435 def setUp(self):
3436 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3437
3438 def tearDown(self):
3439 signal.signal(signal.SIGALRM, self.oldalrm)
3440
3441 def alarm_interrupt(self, sig, frame):
3442 1/0
3443
3444 @unittest.skipUnless(threading, 'Threading required for this test.')
3445 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3446 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003447 invokes the signal handler, and bubbles up the exception raised
3448 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003449 read_results = []
3450 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003451 if hasattr(signal, 'pthread_sigmask'):
3452 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003453 s = os.read(r, 1)
3454 read_results.append(s)
3455 t = threading.Thread(target=_read)
3456 t.daemon = True
3457 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003458 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003459 try:
3460 wio = self.io.open(w, **fdopen_kwargs)
3461 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003462 # Fill the pipe enough that the write will be blocking.
3463 # It will be interrupted by the timer armed above. Since the
3464 # other thread has read one byte, the low-level write will
3465 # return with a successful (partial) result rather than an EINTR.
3466 # The buffered IO layer must check for pending signal
3467 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003468 signal.alarm(1)
3469 try:
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003470 with self.assertRaises(ZeroDivisionError):
3471 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
Victor Stinner775b2dd2013-07-15 19:53:13 +02003472 finally:
3473 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003474 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003475 # We got one byte, get another one and check that it isn't a
3476 # repeat of the first one.
3477 read_results.append(os.read(r, 1))
3478 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3479 finally:
3480 os.close(w)
3481 os.close(r)
3482 # This is deliberate. If we didn't close the file descriptor
3483 # before closing wio, wio would try to flush its internal
3484 # buffer, and block again.
3485 try:
3486 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003487 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003488 if e.errno != errno.EBADF:
3489 raise
3490
3491 def test_interrupted_write_unbuffered(self):
3492 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3493
3494 def test_interrupted_write_buffered(self):
3495 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3496
Victor Stinner6ab72862014-09-03 23:32:28 +02003497 # Issue #22331: The test hangs on FreeBSD 7.2
3498 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003499 def test_interrupted_write_text(self):
3500 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3501
Brett Cannon31f59292011-02-21 19:29:56 +00003502 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003503 def check_reentrant_write(self, data, **fdopen_kwargs):
3504 def on_alarm(*args):
3505 # Will be called reentrantly from the same thread
3506 wio.write(data)
3507 1/0
3508 signal.signal(signal.SIGALRM, on_alarm)
3509 r, w = os.pipe()
3510 wio = self.io.open(w, **fdopen_kwargs)
3511 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003512 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003513 # Either the reentrant call to wio.write() fails with RuntimeError,
3514 # or the signal handler raises ZeroDivisionError.
3515 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3516 while 1:
3517 for i in range(100):
3518 wio.write(data)
3519 wio.flush()
3520 # Make sure the buffer doesn't fill up and block further writes
3521 os.read(r, len(data) * 100)
3522 exc = cm.exception
3523 if isinstance(exc, RuntimeError):
3524 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3525 finally:
3526 wio.close()
3527 os.close(r)
3528
3529 def test_reentrant_write_buffered(self):
3530 self.check_reentrant_write(b"xy", mode="wb")
3531
3532 def test_reentrant_write_text(self):
3533 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3534
Antoine Pitrou707ce822011-02-25 21:24:11 +00003535 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3536 """Check that a buffered read, when it gets interrupted (either
3537 returning a partial result or EINTR), properly invokes the signal
3538 handler and retries if the latter returned successfully."""
3539 r, w = os.pipe()
3540 fdopen_kwargs["closefd"] = False
3541 def alarm_handler(sig, frame):
3542 os.write(w, b"bar")
3543 signal.signal(signal.SIGALRM, alarm_handler)
3544 try:
3545 rio = self.io.open(r, **fdopen_kwargs)
3546 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003547 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003548 # Expected behaviour:
3549 # - first raw read() returns partial b"foo"
3550 # - second raw read() returns EINTR
3551 # - third raw read() returns b"bar"
3552 self.assertEqual(decode(rio.read(6)), "foobar")
3553 finally:
3554 rio.close()
3555 os.close(w)
3556 os.close(r)
3557
Antoine Pitrou20db5112011-08-19 20:32:34 +02003558 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003559 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3560 mode="rb")
3561
Antoine Pitrou20db5112011-08-19 20:32:34 +02003562 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003563 self.check_interrupted_read_retry(lambda x: x,
3564 mode="r")
3565
3566 @unittest.skipUnless(threading, 'Threading required for this test.')
3567 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3568 """Check that a buffered write, when it gets interrupted (either
3569 returning a partial result or EINTR), properly invokes the signal
3570 handler and retries if the latter returned successfully."""
3571 select = support.import_module("select")
3572 # A quantity that exceeds the buffer size of an anonymous pipe's
3573 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003574 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003575 r, w = os.pipe()
3576 fdopen_kwargs["closefd"] = False
3577 # We need a separate thread to read from the pipe and allow the
3578 # write() to finish. This thread is started after the SIGALRM is
3579 # received (forcing a first EINTR in write()).
3580 read_results = []
3581 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003582 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003583 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003584 try:
3585 while not write_finished:
3586 while r in select.select([r], [], [], 1.0)[0]:
3587 s = os.read(r, 1024)
3588 read_results.append(s)
3589 except BaseException as exc:
3590 nonlocal error
3591 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003592 t = threading.Thread(target=_read)
3593 t.daemon = True
3594 def alarm1(sig, frame):
3595 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003596 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003597 def alarm2(sig, frame):
3598 t.start()
3599 signal.signal(signal.SIGALRM, alarm1)
3600 try:
3601 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003602 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003603 # Expected behaviour:
3604 # - first raw write() is partial (because of the limited pipe buffer
3605 # and the first alarm)
3606 # - second raw write() returns EINTR (because of the second alarm)
3607 # - subsequent write()s are successful (either partial or complete)
3608 self.assertEqual(N, wio.write(item * N))
3609 wio.flush()
3610 write_finished = True
3611 t.join()
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003612
3613 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003614 self.assertEqual(N, sum(len(x) for x in read_results))
3615 finally:
3616 write_finished = True
3617 os.close(w)
3618 os.close(r)
3619 # This is deliberate. If we didn't close the file descriptor
3620 # before closing wio, wio would try to flush its internal
3621 # buffer, and could block (in case of failure).
3622 try:
3623 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003624 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003625 if e.errno != errno.EBADF:
3626 raise
3627
Antoine Pitrou20db5112011-08-19 20:32:34 +02003628 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003629 self.check_interrupted_write_retry(b"x", mode="wb")
3630
Antoine Pitrou20db5112011-08-19 20:32:34 +02003631 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003632 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3633
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003634
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003635class CSignalsTest(SignalsTest):
3636 io = io
3637
3638class PySignalsTest(SignalsTest):
3639 io = pyio
3640
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003641 # Handling reentrancy issues would slow down _pyio even more, so the
3642 # tests are disabled.
3643 test_reentrant_write_buffered = None
3644 test_reentrant_write_text = None
3645
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003646
Ezio Melottidaa42c72013-03-23 16:30:16 +02003647def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003648 tests = (CIOTest, PyIOTest,
3649 CBufferedReaderTest, PyBufferedReaderTest,
3650 CBufferedWriterTest, PyBufferedWriterTest,
3651 CBufferedRWPairTest, PyBufferedRWPairTest,
3652 CBufferedRandomTest, PyBufferedRandomTest,
3653 StatefulIncrementalDecoderTest,
3654 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3655 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003656 CMiscIOTest, PyMiscIOTest,
3657 CSignalsTest, PySignalsTest,
3658 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003659
3660 # Put the namespaces of the IO module we are testing and some useful mock
3661 # classes in the __dict__ of each test.
3662 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003663 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003664 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3665 c_io_ns = {name : getattr(io, name) for name in all_members}
3666 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3667 globs = globals()
3668 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3669 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3670 # Avoid turning open into a bound method.
3671 py_io_ns["open"] = pyio.OpenWrapper
3672 for test in tests:
3673 if test.__name__.startswith("C"):
3674 for name, obj in c_io_ns.items():
3675 setattr(test, name, obj)
3676 elif test.__name__.startswith("Py"):
3677 for name, obj in py_io_ns.items():
3678 setattr(test, name, obj)
3679
Ezio Melottidaa42c72013-03-23 16:30:16 +02003680 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3681 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003682
3683if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003684 unittest.main()