blob: 2fb1b1e24a1ec911c6d8c29f4e28e314fd1a049d [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou25f85d42015-04-13 19:41:47 +020038from test.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200596 def check_flush_error_on_close(self, *args, **kwargs):
597 # Test that the file is closed despite failed flush
598 # and that flush() is called before file closed.
599 f = self.open(*args, **kwargs)
600 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200602 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200603 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000604 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200605 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600606 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200607 self.assertTrue(closed) # flush() called
608 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200609 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200610
611 def test_flush_error_on_close(self):
612 # raw file
613 # Issue #5700: io.FileIO calls flush() after file closed
614 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
615 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
616 self.check_flush_error_on_close(fd, 'wb', buffering=0)
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
619 os.close(fd)
620 # buffered io
621 self.check_flush_error_on_close(support.TESTFN, 'wb')
622 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
623 self.check_flush_error_on_close(fd, 'wb')
624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625 self.check_flush_error_on_close(fd, 'wb', closefd=False)
626 os.close(fd)
627 # text io
628 self.check_flush_error_on_close(support.TESTFN, 'w')
629 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
630 self.check_flush_error_on_close(fd, 'w')
631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632 self.check_flush_error_on_close(fd, 'w', closefd=False)
633 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000634
635 def test_multi_close(self):
636 f = self.open(support.TESTFN, "wb", buffering=0)
637 f.close()
638 f.close()
639 f.close()
640 self.assertRaises(ValueError, f.flush)
641
Antoine Pitrou328ec742010-09-14 18:37:24 +0000642 def test_RawIOBase_read(self):
643 # Exercise the default RawIOBase.read() implementation (which calls
644 # readinto() internally).
645 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
646 self.assertEqual(rawio.read(2), b"ab")
647 self.assertEqual(rawio.read(2), b"c")
648 self.assertEqual(rawio.read(2), b"d")
649 self.assertEqual(rawio.read(2), None)
650 self.assertEqual(rawio.read(2), b"ef")
651 self.assertEqual(rawio.read(2), b"g")
652 self.assertEqual(rawio.read(2), None)
653 self.assertEqual(rawio.read(2), b"")
654
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400655 def test_types_have_dict(self):
656 test = (
657 self.IOBase(),
658 self.RawIOBase(),
659 self.TextIOBase(),
660 self.StringIO(),
661 self.BytesIO()
662 )
663 for obj in test:
664 self.assertTrue(hasattr(obj, "__dict__"))
665
Ross Lagerwall59142db2011-10-31 20:34:46 +0200666 def test_opener(self):
667 with self.open(support.TESTFN, "w") as f:
668 f.write("egg\n")
669 fd = os.open(support.TESTFN, os.O_RDONLY)
670 def opener(path, flags):
671 return fd
672 with self.open("non-existent", "r", opener=opener) as f:
673 self.assertEqual(f.read(), "egg\n")
674
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200675 def test_fileio_closefd(self):
676 # Issue #4841
677 with self.open(__file__, 'rb') as f1, \
678 self.open(__file__, 'rb') as f2:
679 fileio = self.FileIO(f1.fileno(), closefd=False)
680 # .__init__() must not close f1
681 fileio.__init__(f2.fileno(), closefd=False)
682 f1.readline()
683 # .close() must not close f2
684 fileio.close()
685 f2.readline()
686
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300687 def test_nonbuffered_textio(self):
688 with warnings.catch_warnings(record=True) as recorded:
689 with self.assertRaises(ValueError):
690 self.open(support.TESTFN, 'w', buffering=0)
691 support.gc_collect()
692 self.assertEqual(recorded, [])
693
694 def test_invalid_newline(self):
695 with warnings.catch_warnings(record=True) as recorded:
696 with self.assertRaises(ValueError):
697 self.open(support.TESTFN, 'w', newline='invalid')
698 support.gc_collect()
699 self.assertEqual(recorded, [])
700
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200703
704 def test_IOBase_finalize(self):
705 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
706 # class which inherits IOBase and an object of this class are caught
707 # in a reference cycle and close() is already in the method cache.
708 class MyIO(self.IOBase):
709 def close(self):
710 pass
711
712 # create an instance to populate the method cache
713 MyIO()
714 obj = MyIO()
715 obj.obj = obj
716 wr = weakref.ref(obj)
717 del MyIO
718 del obj
719 support.gc_collect()
720 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722class PyIOTest(IOTest):
723 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000724
Guido van Rossuma9e20242007-03-08 00:43:48 +0000725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726class CommonBufferedTests:
727 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
728
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000729 def test_detach(self):
730 raw = self.MockRawIO()
731 buf = self.tp(raw)
732 self.assertIs(buf.detach(), raw)
733 self.assertRaises(ValueError, buf.detach)
734
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600735 repr(buf) # Should still work
736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 def test_fileno(self):
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740
Ezio Melottib3aedd42010-11-20 19:04:17 +0000741 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742
Zachary Ware9fe6d862013-12-08 00:20:35 -0600743 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_no_fileno(self):
745 # XXX will we always have fileno() function? If so, kill
746 # this test. Else, write it.
747 pass
748
749 def test_invalid_args(self):
750 rawio = self.MockRawIO()
751 bufio = self.tp(rawio)
752 # Invalid whence
753 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200754 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755
756 def test_override_destructor(self):
757 tp = self.tp
758 record = []
759 class MyBufferedIO(tp):
760 def __del__(self):
761 record.append(1)
762 try:
763 f = super().__del__
764 except AttributeError:
765 pass
766 else:
767 f()
768 def close(self):
769 record.append(2)
770 super().close()
771 def flush(self):
772 record.append(3)
773 super().flush()
774 rawio = self.MockRawIO()
775 bufio = MyBufferedIO(rawio)
776 writable = bufio.writable()
777 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000778 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 if writable:
780 self.assertEqual(record, [1, 2, 3])
781 else:
782 self.assertEqual(record, [1, 2])
783
784 def test_context_manager(self):
785 # Test usability as a context manager
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio)
788 def _with():
789 with bufio:
790 pass
791 _with()
792 # bufio should now be closed, and using it a second time should raise
793 # a ValueError.
794 self.assertRaises(ValueError, _with)
795
796 def test_error_through_destructor(self):
797 # Test that the exception state is not modified by a destructor,
798 # even if close() fails.
799 rawio = self.CloseFailureIO()
800 def f():
801 self.tp(rawio).xyzzy
802 with support.captured_output("stderr") as s:
803 self.assertRaises(AttributeError, f)
804 s = s.getvalue().strip()
805 if s:
806 # The destructor *may* have printed an unraisable error, check it
807 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200808 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000809 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000810
Antoine Pitrou716c4442009-05-23 19:04:03 +0000811 def test_repr(self):
812 raw = self.MockRawIO()
813 b = self.tp(raw)
814 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
815 self.assertEqual(repr(b), "<%s>" % clsname)
816 raw.name = "dummy"
817 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
818 raw.name = b"dummy"
819 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
820
Antoine Pitrou6be88762010-05-03 16:48:20 +0000821 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200822 # Test that buffered file is closed despite failed flush
823 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000824 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200825 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200827 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200828 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000829 raw.flush = bad_flush
830 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200831 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600832 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200833 self.assertTrue(raw.closed)
834 self.assertTrue(closed) # flush() called
835 self.assertFalse(closed[0]) # flush() called before file closed
836 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200837 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600838
839 def test_close_error_on_close(self):
840 raw = self.MockRawIO()
841 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200842 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600843 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200844 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600845 raw.close = bad_close
846 b = self.tp(raw)
847 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200848 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600849 b.close()
850 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300851 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600852 self.assertEqual(err.exception.__context__.args, ('flush',))
853 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000854
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300855 def test_nonnormalized_close_error_on_close(self):
856 # Issue #21677
857 raw = self.MockRawIO()
858 def bad_flush():
859 raise non_existing_flush
860 def bad_close():
861 raise non_existing_close
862 raw.close = bad_close
863 b = self.tp(raw)
864 b.flush = bad_flush
865 with self.assertRaises(NameError) as err: # exception not swallowed
866 b.close()
867 self.assertIn('non_existing_close', str(err.exception))
868 self.assertIsInstance(err.exception.__context__, NameError)
869 self.assertIn('non_existing_flush', str(err.exception.__context__))
870 self.assertFalse(b.closed)
871
Antoine Pitrou6be88762010-05-03 16:48:20 +0000872 def test_multi_close(self):
873 raw = self.MockRawIO()
874 b = self.tp(raw)
875 b.close()
876 b.close()
877 b.close()
878 self.assertRaises(ValueError, b.flush)
879
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000880 def test_unseekable(self):
881 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
882 self.assertRaises(self.UnsupportedOperation, bufio.tell)
883 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
884
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000885 def test_readonly_attributes(self):
886 raw = self.MockRawIO()
887 buf = self.tp(raw)
888 x = self.MockRawIO()
889 with self.assertRaises(AttributeError):
890 buf.raw = x
891
Guido van Rossum78892e42007-04-06 17:31:18 +0000892
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200893class SizeofTest:
894
895 @support.cpython_only
896 def test_sizeof(self):
897 bufsize1 = 4096
898 bufsize2 = 8192
899 rawio = self.MockRawIO()
900 bufio = self.tp(rawio, buffer_size=bufsize1)
901 size = sys.getsizeof(bufio) - bufsize1
902 rawio = self.MockRawIO()
903 bufio = self.tp(rawio, buffer_size=bufsize2)
904 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
905
Jesus Ceadc469452012-10-04 12:37:56 +0200906 @support.cpython_only
907 def test_buffer_freeing(self) :
908 bufsize = 4096
909 rawio = self.MockRawIO()
910 bufio = self.tp(rawio, buffer_size=bufsize)
911 size = sys.getsizeof(bufio) - bufsize
912 bufio.close()
913 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
916 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 def test_constructor(self):
919 rawio = self.MockRawIO([b"abc"])
920 bufio = self.tp(rawio)
921 bufio.__init__(rawio)
922 bufio.__init__(rawio, buffer_size=1024)
923 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000924 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
927 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
928 rawio = self.MockRawIO([b"abc"])
929 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000930 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000931
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200932 def test_uninitialized(self):
933 bufio = self.tp.__new__(self.tp)
934 del bufio
935 bufio = self.tp.__new__(self.tp)
936 self.assertRaisesRegex((ValueError, AttributeError),
937 'uninitialized|has no attribute',
938 bufio.read, 0)
939 bufio.__init__(self.MockRawIO())
940 self.assertEqual(bufio.read(0), b'')
941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000943 for arg in (None, 7):
944 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
945 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 # Invalid args
948 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 def test_read1(self):
951 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"a", bufio.read(1))
954 self.assertEqual(b"b", bufio.read1(1))
955 self.assertEqual(rawio._reads, 1)
956 self.assertEqual(b"c", bufio.read1(100))
957 self.assertEqual(rawio._reads, 1)
958 self.assertEqual(b"d", bufio.read1(100))
959 self.assertEqual(rawio._reads, 2)
960 self.assertEqual(b"efg", bufio.read1(100))
961 self.assertEqual(rawio._reads, 3)
962 self.assertEqual(b"", bufio.read1(100))
963 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 # Invalid args
965 self.assertRaises(ValueError, bufio.read1, -1)
966
967 def test_readinto(self):
968 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
969 bufio = self.tp(rawio)
970 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000971 self.assertEqual(bufio.readinto(b), 2)
972 self.assertEqual(b, b"ab")
973 self.assertEqual(bufio.readinto(b), 2)
974 self.assertEqual(b, b"cd")
975 self.assertEqual(bufio.readinto(b), 2)
976 self.assertEqual(b, b"ef")
977 self.assertEqual(bufio.readinto(b), 1)
978 self.assertEqual(b, b"gf")
979 self.assertEqual(bufio.readinto(b), 0)
980 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200981 rawio = self.MockRawIO((b"abc", None))
982 bufio = self.tp(rawio)
983 self.assertEqual(bufio.readinto(b), 2)
984 self.assertEqual(b, b"ab")
985 self.assertEqual(bufio.readinto(b), 1)
986 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000988 def test_readlines(self):
989 def bufio():
990 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
991 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000992 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
993 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
994 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000997 data = b"abcdefghi"
998 dlen = len(data)
999
1000 tests = [
1001 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1002 [ 100, [ 3, 3, 3], [ dlen ] ],
1003 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1004 ]
1005
1006 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 rawio = self.MockFileIO(data)
1008 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001009 pos = 0
1010 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001011 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001012 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001014 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001017 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1019 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001020 self.assertEqual(b"abcd", bufio.read(6))
1021 self.assertEqual(b"e", bufio.read(1))
1022 self.assertEqual(b"fg", bufio.read())
1023 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001024 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001025 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001026
Victor Stinnera80987f2011-05-25 22:47:16 +02001027 rawio = self.MockRawIO((b"a", None, None))
1028 self.assertEqual(b"a", rawio.readall())
1029 self.assertIsNone(rawio.readall())
1030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_read_past_eof(self):
1032 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1033 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001034
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_read_all(self):
1038 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1039 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001040
Ezio Melottib3aedd42010-11-20 19:04:17 +00001041 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001042
Victor Stinner45df8202010-04-28 22:31:17 +00001043 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001044 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001046 try:
1047 # Write out many bytes with exactly the same number of 0's,
1048 # 1's... 255's. This will help us check that concurrent reading
1049 # doesn't duplicate or forget contents.
1050 N = 1000
1051 l = list(range(256)) * N
1052 random.shuffle(l)
1053 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001054 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001055 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001056 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001058 errors = []
1059 results = []
1060 def f():
1061 try:
1062 # Intra-buffer read then buffer-flushing read
1063 for n in cycle([1, 19]):
1064 s = bufio.read(n)
1065 if not s:
1066 break
1067 # list.append() is atomic
1068 results.append(s)
1069 except Exception as e:
1070 errors.append(e)
1071 raise
1072 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001073 with support.start_threads(threads):
1074 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001075 self.assertFalse(errors,
1076 "the following exceptions were caught: %r" % errors)
1077 s = b''.join(results)
1078 for i in range(256):
1079 c = bytes(bytearray([i]))
1080 self.assertEqual(s.count(c), N)
1081 finally:
1082 support.unlink(support.TESTFN)
1083
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001084 def test_unseekable(self):
1085 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1086 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1087 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1088 bufio.read(1)
1089 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1090 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_misbehaved_io(self):
1093 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1094 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001095 self.assertRaises(OSError, bufio.seek, 0)
1096 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001098 def test_no_extraneous_read(self):
1099 # Issue #9550; when the raw IO object has satisfied the read request,
1100 # we should not issue any additional reads, otherwise it may block
1101 # (e.g. socket).
1102 bufsize = 16
1103 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1104 rawio = self.MockRawIO([b"x" * n])
1105 bufio = self.tp(rawio, bufsize)
1106 self.assertEqual(bufio.read(n), b"x" * n)
1107 # Simple case: one raw read is enough to satisfy the request.
1108 self.assertEqual(rawio._extraneous_reads, 0,
1109 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1110 # A more complex case where two raw reads are needed to satisfy
1111 # the request.
1112 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1113 bufio = self.tp(rawio, bufsize)
1114 self.assertEqual(bufio.read(n), b"x" * n)
1115 self.assertEqual(rawio._extraneous_reads, 0,
1116 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1117
1118
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001119class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 tp = io.BufferedReader
1121
1122 def test_constructor(self):
1123 BufferedReaderTest.test_constructor(self)
1124 # The allocation can succeed on 32-bit builds, e.g. with more
1125 # than 2GB RAM and a 64-bit kernel.
1126 if sys.maxsize > 0x7FFFFFFF:
1127 rawio = self.MockRawIO()
1128 bufio = self.tp(rawio)
1129 self.assertRaises((OverflowError, MemoryError, ValueError),
1130 bufio.__init__, rawio, sys.maxsize)
1131
1132 def test_initialization(self):
1133 rawio = self.MockRawIO([b"abc"])
1134 bufio = self.tp(rawio)
1135 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1136 self.assertRaises(ValueError, bufio.read)
1137 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1138 self.assertRaises(ValueError, bufio.read)
1139 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1140 self.assertRaises(ValueError, bufio.read)
1141
1142 def test_misbehaved_io_read(self):
1143 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1144 bufio = self.tp(rawio)
1145 # _pyio.BufferedReader seems to implement reading different, so that
1146 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001147 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148
1149 def test_garbage_collection(self):
1150 # C BufferedReader objects are collected.
1151 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001152 with support.check_warnings(('', ResourceWarning)):
1153 rawio = self.FileIO(support.TESTFN, "w+b")
1154 f = self.tp(rawio)
1155 f.f = f
1156 wr = weakref.ref(f)
1157 del f
1158 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001159 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160
R David Murray67bfe802013-02-23 21:51:05 -05001161 def test_args_error(self):
1162 # Issue #17275
1163 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1164 self.tp(io.BytesIO(), 1024, 1024, 1024)
1165
1166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167class PyBufferedReaderTest(BufferedReaderTest):
1168 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001169
Guido van Rossuma9e20242007-03-08 00:43:48 +00001170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1172 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174 def test_constructor(self):
1175 rawio = self.MockRawIO()
1176 bufio = self.tp(rawio)
1177 bufio.__init__(rawio)
1178 bufio.__init__(rawio, buffer_size=1024)
1179 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001180 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 bufio.flush()
1182 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1183 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1184 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1185 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001188 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001190 def test_uninitialized(self):
1191 bufio = self.tp.__new__(self.tp)
1192 del bufio
1193 bufio = self.tp.__new__(self.tp)
1194 self.assertRaisesRegex((ValueError, AttributeError),
1195 'uninitialized|has no attribute',
1196 bufio.write, b'')
1197 bufio.__init__(self.MockRawIO())
1198 self.assertEqual(bufio.write(b''), 0)
1199
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001200 def test_detach_flush(self):
1201 raw = self.MockRawIO()
1202 buf = self.tp(raw)
1203 buf.write(b"howdy!")
1204 self.assertFalse(raw._write_stack)
1205 buf.detach()
1206 self.assertEqual(raw._write_stack, [b"howdy!"])
1207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001209 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 writer = self.MockRawIO()
1211 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001212 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001213 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 def test_write_overflow(self):
1216 writer = self.MockRawIO()
1217 bufio = self.tp(writer, 8)
1218 contents = b"abcdefghijklmnop"
1219 for n in range(0, len(contents), 3):
1220 bufio.write(contents[n:n+3])
1221 flushed = b"".join(writer._write_stack)
1222 # At least (total - 8) bytes were implicitly flushed, perhaps more
1223 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001224 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226 def check_writes(self, intermediate_func):
1227 # Lots of writes, test the flushed output is as expected.
1228 contents = bytes(range(256)) * 1000
1229 n = 0
1230 writer = self.MockRawIO()
1231 bufio = self.tp(writer, 13)
1232 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1233 def gen_sizes():
1234 for size in count(1):
1235 for i in range(15):
1236 yield size
1237 sizes = gen_sizes()
1238 while n < len(contents):
1239 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001240 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 intermediate_func(bufio)
1242 n += size
1243 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001244 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 def test_writes(self):
1247 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 def test_writes_and_flushes(self):
1250 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252 def test_writes_and_seeks(self):
1253 def _seekabs(bufio):
1254 pos = bufio.tell()
1255 bufio.seek(pos + 1, 0)
1256 bufio.seek(pos - 1, 0)
1257 bufio.seek(pos, 0)
1258 self.check_writes(_seekabs)
1259 def _seekrel(bufio):
1260 pos = bufio.seek(0, 1)
1261 bufio.seek(+1, 1)
1262 bufio.seek(-1, 1)
1263 bufio.seek(pos, 0)
1264 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001265
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001266 def test_writes_and_truncates(self):
1267 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 def test_write_non_blocking(self):
1270 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001271 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001272
Ezio Melottib3aedd42010-11-20 19:04:17 +00001273 self.assertEqual(bufio.write(b"abcd"), 4)
1274 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275 # 1 byte will be written, the rest will be buffered
1276 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001277 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1280 raw.block_on(b"0")
1281 try:
1282 bufio.write(b"opqrwxyz0123456789")
1283 except self.BlockingIOError as e:
1284 written = e.characters_written
1285 else:
1286 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001287 self.assertEqual(written, 16)
1288 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001290
Ezio Melottib3aedd42010-11-20 19:04:17 +00001291 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 s = raw.pop_written()
1293 # Previously buffered bytes were flushed
1294 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001295
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001296 def test_write_and_rewind(self):
1297 raw = io.BytesIO()
1298 bufio = self.tp(raw, 4)
1299 self.assertEqual(bufio.write(b"abcdef"), 6)
1300 self.assertEqual(bufio.tell(), 6)
1301 bufio.seek(0, 0)
1302 self.assertEqual(bufio.write(b"XY"), 2)
1303 bufio.seek(6, 0)
1304 self.assertEqual(raw.getvalue(), b"XYcdef")
1305 self.assertEqual(bufio.write(b"123456"), 6)
1306 bufio.flush()
1307 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001308
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309 def test_flush(self):
1310 writer = self.MockRawIO()
1311 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001312 bufio.write(b"abc")
1313 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001314 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001315
Antoine Pitrou131a4892012-10-16 22:57:11 +02001316 def test_writelines(self):
1317 l = [b'ab', b'cd', b'ef']
1318 writer = self.MockRawIO()
1319 bufio = self.tp(writer, 8)
1320 bufio.writelines(l)
1321 bufio.flush()
1322 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1323
1324 def test_writelines_userlist(self):
1325 l = UserList([b'ab', b'cd', b'ef'])
1326 writer = self.MockRawIO()
1327 bufio = self.tp(writer, 8)
1328 bufio.writelines(l)
1329 bufio.flush()
1330 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1331
1332 def test_writelines_error(self):
1333 writer = self.MockRawIO()
1334 bufio = self.tp(writer, 8)
1335 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1336 self.assertRaises(TypeError, bufio.writelines, None)
1337 self.assertRaises(TypeError, bufio.writelines, 'abc')
1338
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001339 def test_destructor(self):
1340 writer = self.MockRawIO()
1341 bufio = self.tp(writer, 8)
1342 bufio.write(b"abc")
1343 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001344 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001345 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346
1347 def test_truncate(self):
1348 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001349 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350 bufio = self.tp(raw, 8)
1351 bufio.write(b"abcdef")
1352 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001353 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001354 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355 self.assertEqual(f.read(), b"abc")
1356
Victor Stinner45df8202010-04-28 22:31:17 +00001357 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001358 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001359 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001360 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 # Write out many bytes from many threads and test they were
1362 # all flushed.
1363 N = 1000
1364 contents = bytes(range(256)) * N
1365 sizes = cycle([1, 19])
1366 n = 0
1367 queue = deque()
1368 while n < len(contents):
1369 size = next(sizes)
1370 queue.append(contents[n:n+size])
1371 n += size
1372 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001373 # We use a real file object because it allows us to
1374 # exercise situations where the GIL is released before
1375 # writing the buffer to the raw streams. This is in addition
1376 # to concurrency issues due to switching threads in the middle
1377 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001378 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001379 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001380 errors = []
1381 def f():
1382 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 while True:
1384 try:
1385 s = queue.popleft()
1386 except IndexError:
1387 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001388 bufio.write(s)
1389 except Exception as e:
1390 errors.append(e)
1391 raise
1392 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001393 with support.start_threads(threads):
1394 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001395 self.assertFalse(errors,
1396 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001398 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 s = f.read()
1400 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 finally:
1403 support.unlink(support.TESTFN)
1404
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 def test_misbehaved_io(self):
1406 rawio = self.MisbehavedRawIO()
1407 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001408 self.assertRaises(OSError, bufio.seek, 0)
1409 self.assertRaises(OSError, bufio.tell)
1410 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411
Florent Xicluna109d5732012-07-07 17:03:22 +02001412 def test_max_buffer_size_removal(self):
1413 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001414 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001415
Benjamin Peterson68623612012-12-20 11:53:11 -06001416 def test_write_error_on_close(self):
1417 raw = self.MockRawIO()
1418 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001419 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001420 raw.write = bad_write
1421 b = self.tp(raw)
1422 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001423 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001424 self.assertTrue(b.closed)
1425
Benjamin Peterson59406a92009-03-26 17:10:29 +00001426
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001427class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 tp = io.BufferedWriter
1429
1430 def test_constructor(self):
1431 BufferedWriterTest.test_constructor(self)
1432 # The allocation can succeed on 32-bit builds, e.g. with more
1433 # than 2GB RAM and a 64-bit kernel.
1434 if sys.maxsize > 0x7FFFFFFF:
1435 rawio = self.MockRawIO()
1436 bufio = self.tp(rawio)
1437 self.assertRaises((OverflowError, MemoryError, ValueError),
1438 bufio.__init__, rawio, sys.maxsize)
1439
1440 def test_initialization(self):
1441 rawio = self.MockRawIO()
1442 bufio = self.tp(rawio)
1443 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1444 self.assertRaises(ValueError, bufio.write, b"def")
1445 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1446 self.assertRaises(ValueError, bufio.write, b"def")
1447 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1448 self.assertRaises(ValueError, bufio.write, b"def")
1449
1450 def test_garbage_collection(self):
1451 # C BufferedWriter objects are collected, and collecting them flushes
1452 # all data to disk.
1453 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001454 with support.check_warnings(('', ResourceWarning)):
1455 rawio = self.FileIO(support.TESTFN, "w+b")
1456 f = self.tp(rawio)
1457 f.write(b"123xxx")
1458 f.x = f
1459 wr = weakref.ref(f)
1460 del f
1461 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001462 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001463 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001464 self.assertEqual(f.read(), b"123xxx")
1465
R David Murray67bfe802013-02-23 21:51:05 -05001466 def test_args_error(self):
1467 # Issue #17275
1468 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1469 self.tp(io.BytesIO(), 1024, 1024, 1024)
1470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471
1472class PyBufferedWriterTest(BufferedWriterTest):
1473 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001474
Guido van Rossum01a27522007-03-07 01:00:12 +00001475class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001476
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001477 def test_constructor(self):
1478 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001479 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001480
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001481 def test_uninitialized(self):
1482 pair = self.tp.__new__(self.tp)
1483 del pair
1484 pair = self.tp.__new__(self.tp)
1485 self.assertRaisesRegex((ValueError, AttributeError),
1486 'uninitialized|has no attribute',
1487 pair.read, 0)
1488 self.assertRaisesRegex((ValueError, AttributeError),
1489 'uninitialized|has no attribute',
1490 pair.write, b'')
1491 pair.__init__(self.MockRawIO(), self.MockRawIO())
1492 self.assertEqual(pair.read(0), b'')
1493 self.assertEqual(pair.write(b''), 0)
1494
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001495 def test_detach(self):
1496 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1497 self.assertRaises(self.UnsupportedOperation, pair.detach)
1498
Florent Xicluna109d5732012-07-07 17:03:22 +02001499 def test_constructor_max_buffer_size_removal(self):
1500 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001501 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001502
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001503 def test_constructor_with_not_readable(self):
1504 class NotReadable(MockRawIO):
1505 def readable(self):
1506 return False
1507
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001508 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001509
1510 def test_constructor_with_not_writeable(self):
1511 class NotWriteable(MockRawIO):
1512 def writable(self):
1513 return False
1514
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001515 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001516
1517 def test_read(self):
1518 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1519
1520 self.assertEqual(pair.read(3), b"abc")
1521 self.assertEqual(pair.read(1), b"d")
1522 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001523 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1524 self.assertEqual(pair.read(None), b"abc")
1525
1526 def test_readlines(self):
1527 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1528 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1529 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1530 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001531
1532 def test_read1(self):
1533 # .read1() is delegated to the underlying reader object, so this test
1534 # can be shallow.
1535 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1536
1537 self.assertEqual(pair.read1(3), b"abc")
1538
1539 def test_readinto(self):
1540 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1541
1542 data = bytearray(5)
1543 self.assertEqual(pair.readinto(data), 5)
1544 self.assertEqual(data, b"abcde")
1545
1546 def test_write(self):
1547 w = self.MockRawIO()
1548 pair = self.tp(self.MockRawIO(), w)
1549
1550 pair.write(b"abc")
1551 pair.flush()
1552 pair.write(b"def")
1553 pair.flush()
1554 self.assertEqual(w._write_stack, [b"abc", b"def"])
1555
1556 def test_peek(self):
1557 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1558
1559 self.assertTrue(pair.peek(3).startswith(b"abc"))
1560 self.assertEqual(pair.read(3), b"abc")
1561
1562 def test_readable(self):
1563 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1564 self.assertTrue(pair.readable())
1565
1566 def test_writeable(self):
1567 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1568 self.assertTrue(pair.writable())
1569
1570 def test_seekable(self):
1571 # BufferedRWPairs are never seekable, even if their readers and writers
1572 # are.
1573 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1574 self.assertFalse(pair.seekable())
1575
1576 # .flush() is delegated to the underlying writer object and has been
1577 # tested in the test_write method.
1578
1579 def test_close_and_closed(self):
1580 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1581 self.assertFalse(pair.closed)
1582 pair.close()
1583 self.assertTrue(pair.closed)
1584
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001585 def test_reader_close_error_on_close(self):
1586 def reader_close():
1587 reader_non_existing
1588 reader = self.MockRawIO()
1589 reader.close = reader_close
1590 writer = self.MockRawIO()
1591 pair = self.tp(reader, writer)
1592 with self.assertRaises(NameError) as err:
1593 pair.close()
1594 self.assertIn('reader_non_existing', str(err.exception))
1595 self.assertTrue(pair.closed)
1596 self.assertFalse(reader.closed)
1597 self.assertTrue(writer.closed)
1598
1599 def test_writer_close_error_on_close(self):
1600 def writer_close():
1601 writer_non_existing
1602 reader = self.MockRawIO()
1603 writer = self.MockRawIO()
1604 writer.close = writer_close
1605 pair = self.tp(reader, writer)
1606 with self.assertRaises(NameError) as err:
1607 pair.close()
1608 self.assertIn('writer_non_existing', str(err.exception))
1609 self.assertFalse(pair.closed)
1610 self.assertTrue(reader.closed)
1611 self.assertFalse(writer.closed)
1612
1613 def test_reader_writer_close_error_on_close(self):
1614 def reader_close():
1615 reader_non_existing
1616 def writer_close():
1617 writer_non_existing
1618 reader = self.MockRawIO()
1619 reader.close = reader_close
1620 writer = self.MockRawIO()
1621 writer.close = writer_close
1622 pair = self.tp(reader, writer)
1623 with self.assertRaises(NameError) as err:
1624 pair.close()
1625 self.assertIn('reader_non_existing', str(err.exception))
1626 self.assertIsInstance(err.exception.__context__, NameError)
1627 self.assertIn('writer_non_existing', str(err.exception.__context__))
1628 self.assertFalse(pair.closed)
1629 self.assertFalse(reader.closed)
1630 self.assertFalse(writer.closed)
1631
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001632 def test_isatty(self):
1633 class SelectableIsAtty(MockRawIO):
1634 def __init__(self, isatty):
1635 MockRawIO.__init__(self)
1636 self._isatty = isatty
1637
1638 def isatty(self):
1639 return self._isatty
1640
1641 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1642 self.assertFalse(pair.isatty())
1643
1644 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1645 self.assertTrue(pair.isatty())
1646
1647 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1648 self.assertTrue(pair.isatty())
1649
1650 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1651 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001652
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001653 def test_weakref_clearing(self):
1654 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1655 ref = weakref.ref(brw)
1656 brw = None
1657 ref = None # Shouldn't segfault.
1658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659class CBufferedRWPairTest(BufferedRWPairTest):
1660 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662class PyBufferedRWPairTest(BufferedRWPairTest):
1663 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665
1666class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1667 read_mode = "rb+"
1668 write_mode = "wb+"
1669
1670 def test_constructor(self):
1671 BufferedReaderTest.test_constructor(self)
1672 BufferedWriterTest.test_constructor(self)
1673
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001674 def test_uninitialized(self):
1675 BufferedReaderTest.test_uninitialized(self)
1676 BufferedWriterTest.test_uninitialized(self)
1677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_read_and_write(self):
1679 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001680 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001681
1682 self.assertEqual(b"as", rw.read(2))
1683 rw.write(b"ddd")
1684 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001685 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 def test_seek_and_tell(self):
1690 raw = self.BytesIO(b"asdfghjkl")
1691 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001692
Ezio Melottib3aedd42010-11-20 19:04:17 +00001693 self.assertEqual(b"as", rw.read(2))
1694 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001695 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001696 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001697
Antoine Pitroue05565e2011-08-20 14:39:23 +02001698 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001699 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001700 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001702 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001703 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001704 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(7, rw.tell())
1706 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001707 rw.flush()
1708 self.assertEqual(b"asdf123fl", raw.getvalue())
1709
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001710 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def check_flush_and_read(self, read_func):
1713 raw = self.BytesIO(b"abcdefghi")
1714 bufio = self.tp(raw)
1715
Ezio Melottib3aedd42010-11-20 19:04:17 +00001716 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001718 self.assertEqual(b"ef", read_func(bufio, 2))
1719 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(6, bufio.tell())
1722 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 raw.seek(0, 0)
1724 raw.write(b"XYZ")
1725 # flush() resets the read buffer
1726 bufio.flush()
1727 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729
1730 def test_flush_and_read(self):
1731 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1732
1733 def test_flush_and_readinto(self):
1734 def _readinto(bufio, n=-1):
1735 b = bytearray(n if n >= 0 else 9999)
1736 n = bufio.readinto(b)
1737 return bytes(b[:n])
1738 self.check_flush_and_read(_readinto)
1739
1740 def test_flush_and_peek(self):
1741 def _peek(bufio, n=-1):
1742 # This relies on the fact that the buffer can contain the whole
1743 # raw stream, otherwise peek() can return less.
1744 b = bufio.peek(n)
1745 if n != -1:
1746 b = b[:n]
1747 bufio.seek(len(b), 1)
1748 return b
1749 self.check_flush_and_read(_peek)
1750
1751 def test_flush_and_write(self):
1752 raw = self.BytesIO(b"abcdefghi")
1753 bufio = self.tp(raw)
1754
1755 bufio.write(b"123")
1756 bufio.flush()
1757 bufio.write(b"45")
1758 bufio.flush()
1759 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001760 self.assertEqual(b"12345fghi", raw.getvalue())
1761 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762
1763 def test_threads(self):
1764 BufferedReaderTest.test_threads(self)
1765 BufferedWriterTest.test_threads(self)
1766
1767 def test_writes_and_peek(self):
1768 def _peek(bufio):
1769 bufio.peek(1)
1770 self.check_writes(_peek)
1771 def _peek(bufio):
1772 pos = bufio.tell()
1773 bufio.seek(-1, 1)
1774 bufio.peek(1)
1775 bufio.seek(pos, 0)
1776 self.check_writes(_peek)
1777
1778 def test_writes_and_reads(self):
1779 def _read(bufio):
1780 bufio.seek(-1, 1)
1781 bufio.read(1)
1782 self.check_writes(_read)
1783
1784 def test_writes_and_read1s(self):
1785 def _read1(bufio):
1786 bufio.seek(-1, 1)
1787 bufio.read1(1)
1788 self.check_writes(_read1)
1789
1790 def test_writes_and_readintos(self):
1791 def _read(bufio):
1792 bufio.seek(-1, 1)
1793 bufio.readinto(bytearray(1))
1794 self.check_writes(_read)
1795
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001796 def test_write_after_readahead(self):
1797 # Issue #6629: writing after the buffer was filled by readahead should
1798 # first rewind the raw stream.
1799 for overwrite_size in [1, 5]:
1800 raw = self.BytesIO(b"A" * 10)
1801 bufio = self.tp(raw, 4)
1802 # Trigger readahead
1803 self.assertEqual(bufio.read(1), b"A")
1804 self.assertEqual(bufio.tell(), 1)
1805 # Overwriting should rewind the raw stream if it needs so
1806 bufio.write(b"B" * overwrite_size)
1807 self.assertEqual(bufio.tell(), overwrite_size + 1)
1808 # If the write size was smaller than the buffer size, flush() and
1809 # check that rewind happens.
1810 bufio.flush()
1811 self.assertEqual(bufio.tell(), overwrite_size + 1)
1812 s = raw.getvalue()
1813 self.assertEqual(s,
1814 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1815
Antoine Pitrou7c404892011-05-13 00:13:33 +02001816 def test_write_rewind_write(self):
1817 # Various combinations of reading / writing / seeking backwards / writing again
1818 def mutate(bufio, pos1, pos2):
1819 assert pos2 >= pos1
1820 # Fill the buffer
1821 bufio.seek(pos1)
1822 bufio.read(pos2 - pos1)
1823 bufio.write(b'\x02')
1824 # This writes earlier than the previous write, but still inside
1825 # the buffer.
1826 bufio.seek(pos1)
1827 bufio.write(b'\x01')
1828
1829 b = b"\x80\x81\x82\x83\x84"
1830 for i in range(0, len(b)):
1831 for j in range(i, len(b)):
1832 raw = self.BytesIO(b)
1833 bufio = self.tp(raw, 100)
1834 mutate(bufio, i, j)
1835 bufio.flush()
1836 expected = bytearray(b)
1837 expected[j] = 2
1838 expected[i] = 1
1839 self.assertEqual(raw.getvalue(), expected,
1840 "failed result for i=%d, j=%d" % (i, j))
1841
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001842 def test_truncate_after_read_or_write(self):
1843 raw = self.BytesIO(b"A" * 10)
1844 bufio = self.tp(raw, 100)
1845 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1846 self.assertEqual(bufio.truncate(), 2)
1847 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1848 self.assertEqual(bufio.truncate(), 4)
1849
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850 def test_misbehaved_io(self):
1851 BufferedReaderTest.test_misbehaved_io(self)
1852 BufferedWriterTest.test_misbehaved_io(self)
1853
Antoine Pitroue05565e2011-08-20 14:39:23 +02001854 def test_interleaved_read_write(self):
1855 # Test for issue #12213
1856 with self.BytesIO(b'abcdefgh') as raw:
1857 with self.tp(raw, 100) as f:
1858 f.write(b"1")
1859 self.assertEqual(f.read(1), b'b')
1860 f.write(b'2')
1861 self.assertEqual(f.read1(1), b'd')
1862 f.write(b'3')
1863 buf = bytearray(1)
1864 f.readinto(buf)
1865 self.assertEqual(buf, b'f')
1866 f.write(b'4')
1867 self.assertEqual(f.peek(1), b'h')
1868 f.flush()
1869 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1870
1871 with self.BytesIO(b'abc') as raw:
1872 with self.tp(raw, 100) as f:
1873 self.assertEqual(f.read(1), b'a')
1874 f.write(b"2")
1875 self.assertEqual(f.read(1), b'c')
1876 f.flush()
1877 self.assertEqual(raw.getvalue(), b'a2c')
1878
1879 def test_interleaved_readline_write(self):
1880 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1881 with self.tp(raw) as f:
1882 f.write(b'1')
1883 self.assertEqual(f.readline(), b'b\n')
1884 f.write(b'2')
1885 self.assertEqual(f.readline(), b'def\n')
1886 f.write(b'3')
1887 self.assertEqual(f.readline(), b'\n')
1888 f.flush()
1889 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1890
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001891 # You can't construct a BufferedRandom over a non-seekable stream.
1892 test_unseekable = None
1893
R David Murray67bfe802013-02-23 21:51:05 -05001894
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001895class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 tp = io.BufferedRandom
1897
1898 def test_constructor(self):
1899 BufferedRandomTest.test_constructor(self)
1900 # The allocation can succeed on 32-bit builds, e.g. with more
1901 # than 2GB RAM and a 64-bit kernel.
1902 if sys.maxsize > 0x7FFFFFFF:
1903 rawio = self.MockRawIO()
1904 bufio = self.tp(rawio)
1905 self.assertRaises((OverflowError, MemoryError, ValueError),
1906 bufio.__init__, rawio, sys.maxsize)
1907
1908 def test_garbage_collection(self):
1909 CBufferedReaderTest.test_garbage_collection(self)
1910 CBufferedWriterTest.test_garbage_collection(self)
1911
R David Murray67bfe802013-02-23 21:51:05 -05001912 def test_args_error(self):
1913 # Issue #17275
1914 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1915 self.tp(io.BytesIO(), 1024, 1024, 1024)
1916
1917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918class PyBufferedRandomTest(BufferedRandomTest):
1919 tp = pyio.BufferedRandom
1920
1921
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001922# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1923# properties:
1924# - A single output character can correspond to many bytes of input.
1925# - The number of input bytes to complete the character can be
1926# undetermined until the last input byte is received.
1927# - The number of input bytes can vary depending on previous input.
1928# - A single input byte can correspond to many characters of output.
1929# - The number of output characters can be undetermined until the
1930# last input byte is received.
1931# - The number of output characters can vary depending on previous input.
1932
1933class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1934 """
1935 For testing seek/tell behavior with a stateful, buffering decoder.
1936
1937 Input is a sequence of words. Words may be fixed-length (length set
1938 by input) or variable-length (period-terminated). In variable-length
1939 mode, extra periods are ignored. Possible words are:
1940 - 'i' followed by a number sets the input length, I (maximum 99).
1941 When I is set to 0, words are space-terminated.
1942 - 'o' followed by a number sets the output length, O (maximum 99).
1943 - Any other word is converted into a word followed by a period on
1944 the output. The output word consists of the input word truncated
1945 or padded out with hyphens to make its length equal to O. If O
1946 is 0, the word is output verbatim without truncating or padding.
1947 I and O are initially set to 1. When I changes, any buffered input is
1948 re-scanned according to the new I. EOF also terminates the last word.
1949 """
1950
1951 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001952 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001953 self.reset()
1954
1955 def __repr__(self):
1956 return '<SID %x>' % id(self)
1957
1958 def reset(self):
1959 self.i = 1
1960 self.o = 1
1961 self.buffer = bytearray()
1962
1963 def getstate(self):
1964 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1965 return bytes(self.buffer), i*100 + o
1966
1967 def setstate(self, state):
1968 buffer, io = state
1969 self.buffer = bytearray(buffer)
1970 i, o = divmod(io, 100)
1971 self.i, self.o = i ^ 1, o ^ 1
1972
1973 def decode(self, input, final=False):
1974 output = ''
1975 for b in input:
1976 if self.i == 0: # variable-length, terminated with period
1977 if b == ord('.'):
1978 if self.buffer:
1979 output += self.process_word()
1980 else:
1981 self.buffer.append(b)
1982 else: # fixed-length, terminate after self.i bytes
1983 self.buffer.append(b)
1984 if len(self.buffer) == self.i:
1985 output += self.process_word()
1986 if final and self.buffer: # EOF terminates the last word
1987 output += self.process_word()
1988 return output
1989
1990 def process_word(self):
1991 output = ''
1992 if self.buffer[0] == ord('i'):
1993 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1994 elif self.buffer[0] == ord('o'):
1995 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1996 else:
1997 output = self.buffer.decode('ascii')
1998 if len(output) < self.o:
1999 output += '-'*self.o # pad out with hyphens
2000 if self.o:
2001 output = output[:self.o] # truncate to output length
2002 output += '.'
2003 self.buffer = bytearray()
2004 return output
2005
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002006 codecEnabled = False
2007
2008 @classmethod
2009 def lookupTestDecoder(cls, name):
2010 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002011 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002012 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002013 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002014 incrementalencoder=None,
2015 streamreader=None, streamwriter=None,
2016 incrementaldecoder=cls)
2017
2018# Register the previous decoder for testing.
2019# Disabled by default, tests will enable it.
2020codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2021
2022
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002023class StatefulIncrementalDecoderTest(unittest.TestCase):
2024 """
2025 Make sure the StatefulIncrementalDecoder actually works.
2026 """
2027
2028 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002029 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002030 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002031 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002032 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002033 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002034 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002035 # I=0, O=6 (variable-length input, fixed-length output)
2036 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2037 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002038 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002039 # I=6, O=3 (fixed-length input > fixed-length output)
2040 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2041 # I=0, then 3; O=29, then 15 (with longer output)
2042 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2043 'a----------------------------.' +
2044 'b----------------------------.' +
2045 'cde--------------------------.' +
2046 'abcdefghijabcde.' +
2047 'a.b------------.' +
2048 '.c.------------.' +
2049 'd.e------------.' +
2050 'k--------------.' +
2051 'l--------------.' +
2052 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002053 ]
2054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002056 # Try a few one-shot test cases.
2057 for input, eof, output in self.test_cases:
2058 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002059 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002060
2061 # Also test an unfinished decode, followed by forcing EOF.
2062 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002063 self.assertEqual(d.decode(b'oiabcd'), '')
2064 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002065
2066class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002067
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002068 def setUp(self):
2069 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2070 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002071 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002072
Guido van Rossumd0712812007-04-11 16:32:43 +00002073 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002074 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 def test_constructor(self):
2077 r = self.BytesIO(b"\xc3\xa9\n\n")
2078 b = self.BufferedReader(r, 1000)
2079 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002080 t.__init__(b, encoding="latin-1", newline="\r\n")
2081 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002082 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002083 t.__init__(b, encoding="utf-8", line_buffering=True)
2084 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002085 self.assertEqual(t.line_buffering, True)
2086 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 self.assertRaises(TypeError, t.__init__, b, newline=42)
2088 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2089
Serhiy Storchaka6a694662015-04-16 11:54:14 +03002090 def test_uninitialized(self):
2091 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2092 del t
2093 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2094 self.assertRaises(Exception, repr, t)
2095 self.assertRaisesRegex((ValueError, AttributeError),
2096 'uninitialized|has no attribute',
2097 t.read, 0)
2098 t.__init__(self.MockRawIO())
2099 self.assertEqual(t.read(0), '')
2100
Nick Coghlana9b15242014-02-04 22:11:18 +10002101 def test_non_text_encoding_codecs_are_rejected(self):
2102 # Ensure the constructor complains if passed a codec that isn't
2103 # marked as a text encoding
2104 # http://bugs.python.org/issue20404
2105 r = self.BytesIO()
2106 b = self.BufferedWriter(r)
2107 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2108 self.TextIOWrapper(b, encoding="hex")
2109
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002110 def test_detach(self):
2111 r = self.BytesIO()
2112 b = self.BufferedWriter(r)
2113 t = self.TextIOWrapper(b)
2114 self.assertIs(t.detach(), b)
2115
2116 t = self.TextIOWrapper(b, encoding="ascii")
2117 t.write("howdy")
2118 self.assertFalse(r.getvalue())
2119 t.detach()
2120 self.assertEqual(r.getvalue(), b"howdy")
2121 self.assertRaises(ValueError, t.detach)
2122
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002123 # Operations independent of the detached stream should still work
2124 repr(t)
2125 self.assertEqual(t.encoding, "ascii")
2126 self.assertEqual(t.errors, "strict")
2127 self.assertFalse(t.line_buffering)
2128
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002129 def test_repr(self):
2130 raw = self.BytesIO("hello".encode("utf-8"))
2131 b = self.BufferedReader(raw)
2132 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002133 modname = self.TextIOWrapper.__module__
2134 self.assertEqual(repr(t),
2135 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2136 raw.name = "dummy"
2137 self.assertEqual(repr(t),
2138 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002139 t.mode = "r"
2140 self.assertEqual(repr(t),
2141 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002142 raw.name = b"dummy"
2143 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002144 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002145
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002146 t.buffer.detach()
2147 repr(t) # Should not raise an exception
2148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 def test_line_buffering(self):
2150 r = self.BytesIO()
2151 b = self.BufferedWriter(r, 1000)
2152 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002153 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002154 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002155 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002156 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002157 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002159
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002160 def test_default_encoding(self):
2161 old_environ = dict(os.environ)
2162 try:
2163 # try to get a user preferred encoding different than the current
2164 # locale encoding to check that TextIOWrapper() uses the current
2165 # locale encoding and not the user preferred encoding
2166 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2167 if key in os.environ:
2168 del os.environ[key]
2169
2170 current_locale_encoding = locale.getpreferredencoding(False)
2171 b = self.BytesIO()
2172 t = self.TextIOWrapper(b)
2173 self.assertEqual(t.encoding, current_locale_encoding)
2174 finally:
2175 os.environ.clear()
2176 os.environ.update(old_environ)
2177
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002178 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002179 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002180 # Issue 15989
2181 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002182 b = self.BytesIO()
2183 b.fileno = lambda: _testcapi.INT_MAX + 1
2184 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2185 b.fileno = lambda: _testcapi.UINT_MAX + 1
2186 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 def test_encoding(self):
2189 # Check the encoding attribute is always set, and valid
2190 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002191 t = self.TextIOWrapper(b, encoding="utf-8")
2192 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002194 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 codecs.lookup(t.encoding)
2196
2197 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002198 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 b = self.BytesIO(b"abc\n\xff\n")
2200 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002201 self.assertRaises(UnicodeError, t.read)
2202 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203 b = self.BytesIO(b"abc\n\xff\n")
2204 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002205 self.assertRaises(UnicodeError, t.read)
2206 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002207 b = self.BytesIO(b"abc\n\xff\n")
2208 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002210 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 b = self.BytesIO(b"abc\n\xff\n")
2212 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002216 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217 b = self.BytesIO()
2218 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002219 self.assertRaises(UnicodeError, t.write, "\xff")
2220 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002221 b = self.BytesIO()
2222 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002223 self.assertRaises(UnicodeError, t.write, "\xff")
2224 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 b = self.BytesIO()
2226 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002227 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002228 t.write("abc\xffdef\n")
2229 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002231 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002232 b = self.BytesIO()
2233 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002234 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002235 t.write("abc\xffdef\n")
2236 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002239 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002240 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2241
2242 tests = [
2243 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002244 [ '', input_lines ],
2245 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2246 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2247 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002248 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002249 encodings = (
2250 'utf-8', 'latin-1',
2251 'utf-16', 'utf-16-le', 'utf-16-be',
2252 'utf-32', 'utf-32-le', 'utf-32-be',
2253 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002254
Guido van Rossum8358db22007-08-18 21:39:55 +00002255 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002256 # character in TextIOWrapper._pending_line.
2257 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002258 # XXX: str.encode() should return bytes
2259 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002260 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002261 for bufsize in range(1, 10):
2262 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2264 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002265 encoding=encoding)
2266 if do_reads:
2267 got_lines = []
2268 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002269 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002270 if c2 == '':
2271 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002272 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002273 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002274 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002275 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002276
2277 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002278 self.assertEqual(got_line, exp_line)
2279 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 def test_newlines_input(self):
2282 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002283 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2284 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002285 (None, normalized.decode("ascii").splitlines(keepends=True)),
2286 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2288 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2289 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002290 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002291 buf = self.BytesIO(testdata)
2292 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002293 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002294 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002295 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002296
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002297 def test_newlines_output(self):
2298 testdict = {
2299 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2300 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2301 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2302 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2303 }
2304 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2305 for newline, expected in tests:
2306 buf = self.BytesIO()
2307 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2308 txt.write("AAA\nB")
2309 txt.write("BB\nCCC\n")
2310 txt.write("X\rY\r\nZ")
2311 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002312 self.assertEqual(buf.closed, False)
2313 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002314
2315 def test_destructor(self):
2316 l = []
2317 base = self.BytesIO
2318 class MyBytesIO(base):
2319 def close(self):
2320 l.append(self.getvalue())
2321 base.close(self)
2322 b = MyBytesIO()
2323 t = self.TextIOWrapper(b, encoding="ascii")
2324 t.write("abc")
2325 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002326 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002327 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328
2329 def test_override_destructor(self):
2330 record = []
2331 class MyTextIO(self.TextIOWrapper):
2332 def __del__(self):
2333 record.append(1)
2334 try:
2335 f = super().__del__
2336 except AttributeError:
2337 pass
2338 else:
2339 f()
2340 def close(self):
2341 record.append(2)
2342 super().close()
2343 def flush(self):
2344 record.append(3)
2345 super().flush()
2346 b = self.BytesIO()
2347 t = MyTextIO(b, encoding="ascii")
2348 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002349 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002350 self.assertEqual(record, [1, 2, 3])
2351
2352 def test_error_through_destructor(self):
2353 # Test that the exception state is not modified by a destructor,
2354 # even if close() fails.
2355 rawio = self.CloseFailureIO()
2356 def f():
2357 self.TextIOWrapper(rawio).xyzzy
2358 with support.captured_output("stderr") as s:
2359 self.assertRaises(AttributeError, f)
2360 s = s.getvalue().strip()
2361 if s:
2362 # The destructor *may* have printed an unraisable error, check it
2363 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002364 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002365 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002366
Guido van Rossum9b76da62007-04-11 01:09:03 +00002367 # Systematic tests of the text I/O API
2368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002370 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002371 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002373 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002375 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002377 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002378 self.assertEqual(f.tell(), 0)
2379 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002380 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002381 self.assertEqual(f.seek(0), 0)
2382 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002383 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(f.read(2), "ab")
2385 self.assertEqual(f.read(1), "c")
2386 self.assertEqual(f.read(1), "")
2387 self.assertEqual(f.read(), "")
2388 self.assertEqual(f.tell(), cookie)
2389 self.assertEqual(f.seek(0), 0)
2390 self.assertEqual(f.seek(0, 2), cookie)
2391 self.assertEqual(f.write("def"), 3)
2392 self.assertEqual(f.seek(cookie), cookie)
2393 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002394 if enc.startswith("utf"):
2395 self.multi_line_test(f, enc)
2396 f.close()
2397
2398 def multi_line_test(self, f, enc):
2399 f.seek(0)
2400 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002401 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002402 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002403 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002404 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002405 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002406 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002407 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002408 wlines.append((f.tell(), line))
2409 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002410 f.seek(0)
2411 rlines = []
2412 while True:
2413 pos = f.tell()
2414 line = f.readline()
2415 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002416 break
2417 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002418 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002419
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002420 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002421 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002422 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002423 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002424 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002425 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002426 p2 = f.tell()
2427 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002428 self.assertEqual(f.tell(), p0)
2429 self.assertEqual(f.readline(), "\xff\n")
2430 self.assertEqual(f.tell(), p1)
2431 self.assertEqual(f.readline(), "\xff\n")
2432 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002433 f.seek(0)
2434 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002435 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002436 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002437 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002438 f.close()
2439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 def test_seeking(self):
2441 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002442 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002443 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002444 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002446 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002447 suffix = bytes(u_suffix.encode("utf-8"))
2448 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002449 with self.open(support.TESTFN, "wb") as f:
2450 f.write(line*2)
2451 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2452 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002453 self.assertEqual(s, str(prefix, "ascii"))
2454 self.assertEqual(f.tell(), prefix_size)
2455 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002457 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002458 # Regression test for a specific bug
2459 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002460 with self.open(support.TESTFN, "wb") as f:
2461 f.write(data)
2462 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2463 f._CHUNK_SIZE # Just test that it exists
2464 f._CHUNK_SIZE = 2
2465 f.readline()
2466 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002467
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468 def test_seek_and_tell(self):
2469 #Test seek/tell using the StatefulIncrementalDecoder.
2470 # Make test faster by doing smaller seeks
2471 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002472
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002473 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002474 """Tell/seek to various points within a data stream and ensure
2475 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002476 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002477 f.write(data)
2478 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002479 f = self.open(support.TESTFN, encoding='test_decoder')
2480 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002481 decoded = f.read()
2482 f.close()
2483
Neal Norwitze2b07052008-03-18 19:52:05 +00002484 for i in range(min_pos, len(decoded) + 1): # seek positions
2485 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002486 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002488 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002489 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002490 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002491 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002492 f.close()
2493
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002494 # Enable the test decoder.
2495 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002496
2497 # Run the tests.
2498 try:
2499 # Try each test case.
2500 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002501 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002502
2503 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002504 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2505 offset = CHUNK_SIZE - len(input)//2
2506 prefix = b'.'*offset
2507 # Don't bother seeking into the prefix (takes too long).
2508 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002509 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002510
2511 # Ensure our test decoder won't interfere with subsequent tests.
2512 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002513 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002516 data = "1234567890"
2517 tests = ("utf-16",
2518 "utf-16-le",
2519 "utf-16-be",
2520 "utf-32",
2521 "utf-32-le",
2522 "utf-32-be")
2523 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002524 buf = self.BytesIO()
2525 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002526 # Check if the BOM is written only once (see issue1753).
2527 f.write(data)
2528 f.write(data)
2529 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002530 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002531 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002532 self.assertEqual(f.read(), data * 2)
2533 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002534
Benjamin Petersona1b49012009-03-31 23:11:32 +00002535 def test_unreadable(self):
2536 class UnReadable(self.BytesIO):
2537 def readable(self):
2538 return False
2539 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002540 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542 def test_read_one_by_one(self):
2543 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002544 reads = ""
2545 while True:
2546 c = txt.read(1)
2547 if not c:
2548 break
2549 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002550 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002551
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002552 def test_readlines(self):
2553 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2554 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2555 txt.seek(0)
2556 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2557 txt.seek(0)
2558 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2559
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002560 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002561 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002562 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002564 reads = ""
2565 while True:
2566 c = txt.read(128)
2567 if not c:
2568 break
2569 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002570 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002571
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002572 def test_writelines(self):
2573 l = ['ab', 'cd', 'ef']
2574 buf = self.BytesIO()
2575 txt = self.TextIOWrapper(buf)
2576 txt.writelines(l)
2577 txt.flush()
2578 self.assertEqual(buf.getvalue(), b'abcdef')
2579
2580 def test_writelines_userlist(self):
2581 l = UserList(['ab', 'cd', 'ef'])
2582 buf = self.BytesIO()
2583 txt = self.TextIOWrapper(buf)
2584 txt.writelines(l)
2585 txt.flush()
2586 self.assertEqual(buf.getvalue(), b'abcdef')
2587
2588 def test_writelines_error(self):
2589 txt = self.TextIOWrapper(self.BytesIO())
2590 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2591 self.assertRaises(TypeError, txt.writelines, None)
2592 self.assertRaises(TypeError, txt.writelines, b'abc')
2593
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002595 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002596
2597 # read one char at a time
2598 reads = ""
2599 while True:
2600 c = txt.read(1)
2601 if not c:
2602 break
2603 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002604 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002605
2606 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002607 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002608 txt._CHUNK_SIZE = 4
2609
2610 reads = ""
2611 while True:
2612 c = txt.read(4)
2613 if not c:
2614 break
2615 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002617
2618 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002620 txt._CHUNK_SIZE = 4
2621
2622 reads = txt.read(4)
2623 reads += txt.read(4)
2624 reads += txt.readline()
2625 reads += txt.readline()
2626 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002627 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002628
2629 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002630 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002631 txt._CHUNK_SIZE = 4
2632
2633 reads = txt.read(4)
2634 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002635 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002636
2637 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002638 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002639 txt._CHUNK_SIZE = 4
2640
2641 reads = txt.read(4)
2642 pos = txt.tell()
2643 txt.seek(0)
2644 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002645 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002646
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002647 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 buffer = self.BytesIO(self.testdata)
2649 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002650
2651 self.assertEqual(buffer.seekable(), txt.seekable())
2652
Antoine Pitroue4501852009-05-14 18:55:55 +00002653 def test_append_bom(self):
2654 # The BOM is not written again when appending to a non-empty file
2655 filename = support.TESTFN
2656 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2657 with self.open(filename, 'w', encoding=charset) as f:
2658 f.write('aaa')
2659 pos = f.tell()
2660 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002661 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002662
2663 with self.open(filename, 'a', encoding=charset) as f:
2664 f.write('xxx')
2665 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002666 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002667
2668 def test_seek_bom(self):
2669 # Same test, but when seeking manually
2670 filename = support.TESTFN
2671 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2672 with self.open(filename, 'w', encoding=charset) as f:
2673 f.write('aaa')
2674 pos = f.tell()
2675 with self.open(filename, 'r+', encoding=charset) as f:
2676 f.seek(pos)
2677 f.write('zzz')
2678 f.seek(0)
2679 f.write('bbb')
2680 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002681 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002682
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002683 def test_seek_append_bom(self):
2684 # Same test, but first seek to the start and then to the end
2685 filename = support.TESTFN
2686 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2687 with self.open(filename, 'w', encoding=charset) as f:
2688 f.write('aaa')
2689 with self.open(filename, 'a', encoding=charset) as f:
2690 f.seek(0)
2691 f.seek(0, self.SEEK_END)
2692 f.write('xxx')
2693 with self.open(filename, 'rb') as f:
2694 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2695
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002696 def test_errors_property(self):
2697 with self.open(support.TESTFN, "w") as f:
2698 self.assertEqual(f.errors, "strict")
2699 with self.open(support.TESTFN, "w", errors="replace") as f:
2700 self.assertEqual(f.errors, "replace")
2701
Brett Cannon31f59292011-02-21 19:29:56 +00002702 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002703 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002704 def test_threads_write(self):
2705 # Issue6750: concurrent writes could duplicate data
2706 event = threading.Event()
2707 with self.open(support.TESTFN, "w", buffering=1) as f:
2708 def run(n):
2709 text = "Thread%03d\n" % n
2710 event.wait()
2711 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002712 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002713 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002714 with support.start_threads(threads, event.set):
2715 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002716 with self.open(support.TESTFN) as f:
2717 content = f.read()
2718 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002719 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002720
Antoine Pitrou6be88762010-05-03 16:48:20 +00002721 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002722 # Test that text file is closed despite failed flush
2723 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002724 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002725 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002726 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002727 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002728 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002729 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002730 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002731 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002732 self.assertTrue(txt.buffer.closed)
2733 self.assertTrue(closed) # flush() called
2734 self.assertFalse(closed[0]) # flush() called before file closed
2735 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002736 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002737
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002738 def test_close_error_on_close(self):
2739 buffer = self.BytesIO(self.testdata)
2740 def bad_flush():
2741 raise OSError('flush')
2742 def bad_close():
2743 raise OSError('close')
2744 buffer.close = bad_close
2745 txt = self.TextIOWrapper(buffer, encoding="ascii")
2746 txt.flush = bad_flush
2747 with self.assertRaises(OSError) as err: # exception not swallowed
2748 txt.close()
2749 self.assertEqual(err.exception.args, ('close',))
2750 self.assertIsInstance(err.exception.__context__, OSError)
2751 self.assertEqual(err.exception.__context__.args, ('flush',))
2752 self.assertFalse(txt.closed)
2753
2754 def test_nonnormalized_close_error_on_close(self):
2755 # Issue #21677
2756 buffer = self.BytesIO(self.testdata)
2757 def bad_flush():
2758 raise non_existing_flush
2759 def bad_close():
2760 raise non_existing_close
2761 buffer.close = bad_close
2762 txt = self.TextIOWrapper(buffer, encoding="ascii")
2763 txt.flush = bad_flush
2764 with self.assertRaises(NameError) as err: # exception not swallowed
2765 txt.close()
2766 self.assertIn('non_existing_close', str(err.exception))
2767 self.assertIsInstance(err.exception.__context__, NameError)
2768 self.assertIn('non_existing_flush', str(err.exception.__context__))
2769 self.assertFalse(txt.closed)
2770
Antoine Pitrou6be88762010-05-03 16:48:20 +00002771 def test_multi_close(self):
2772 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2773 txt.close()
2774 txt.close()
2775 txt.close()
2776 self.assertRaises(ValueError, txt.flush)
2777
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002778 def test_unseekable(self):
2779 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2780 self.assertRaises(self.UnsupportedOperation, txt.tell)
2781 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2782
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002783 def test_readonly_attributes(self):
2784 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2785 buf = self.BytesIO(self.testdata)
2786 with self.assertRaises(AttributeError):
2787 txt.buffer = buf
2788
Antoine Pitroue96ec682011-07-23 21:46:35 +02002789 def test_rawio(self):
2790 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2791 # that subprocess.Popen() can have the required unbuffered
2792 # semantics with universal_newlines=True.
2793 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2794 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2795 # Reads
2796 self.assertEqual(txt.read(4), 'abcd')
2797 self.assertEqual(txt.readline(), 'efghi\n')
2798 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2799
2800 def test_rawio_write_through(self):
2801 # Issue #12591: with write_through=True, writes don't need a flush
2802 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2803 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2804 write_through=True)
2805 txt.write('1')
2806 txt.write('23\n4')
2807 txt.write('5')
2808 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2809
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002810 def test_bufio_write_through(self):
2811 # Issue #21396: write_through=True doesn't force a flush()
2812 # on the underlying binary buffered object.
2813 flush_called, write_called = [], []
2814 class BufferedWriter(self.BufferedWriter):
2815 def flush(self, *args, **kwargs):
2816 flush_called.append(True)
2817 return super().flush(*args, **kwargs)
2818 def write(self, *args, **kwargs):
2819 write_called.append(True)
2820 return super().write(*args, **kwargs)
2821
2822 rawio = self.BytesIO()
2823 data = b"a"
2824 bufio = BufferedWriter(rawio, len(data)*2)
2825 textio = self.TextIOWrapper(bufio, encoding='ascii',
2826 write_through=True)
2827 # write to the buffered io but don't overflow the buffer
2828 text = data.decode('ascii')
2829 textio.write(text)
2830
2831 # buffer.flush is not called with write_through=True
2832 self.assertFalse(flush_called)
2833 # buffer.write *is* called with write_through=True
2834 self.assertTrue(write_called)
2835 self.assertEqual(rawio.getvalue(), b"") # no flush
2836
2837 write_called = [] # reset
2838 textio.write(text * 10) # total content is larger than bufio buffer
2839 self.assertTrue(write_called)
2840 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2841
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002842 def test_read_nonbytes(self):
2843 # Issue #17106
2844 # Crash when underlying read() returns non-bytes
2845 t = self.TextIOWrapper(self.StringIO('a'))
2846 self.assertRaises(TypeError, t.read, 1)
2847 t = self.TextIOWrapper(self.StringIO('a'))
2848 self.assertRaises(TypeError, t.readline)
2849 t = self.TextIOWrapper(self.StringIO('a'))
2850 self.assertRaises(TypeError, t.read)
2851
2852 def test_illegal_decoder(self):
2853 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002854 # Bypass the early encoding check added in issue 20404
2855 def _make_illegal_wrapper():
2856 quopri = codecs.lookup("quopri")
2857 quopri._is_text_encoding = True
2858 try:
2859 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2860 newline='\n', encoding="quopri")
2861 finally:
2862 quopri._is_text_encoding = False
2863 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002864 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002865 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002866 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002867 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002868 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002869 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002870 self.assertRaises(TypeError, t.read)
2871
Antoine Pitrou712cb732013-12-21 15:51:54 +01002872 def _check_create_at_shutdown(self, **kwargs):
2873 # Issue #20037: creating a TextIOWrapper at shutdown
2874 # shouldn't crash the interpreter.
2875 iomod = self.io.__name__
2876 code = """if 1:
2877 import codecs
2878 import {iomod} as io
2879
2880 # Avoid looking up codecs at shutdown
2881 codecs.lookup('utf-8')
2882
2883 class C:
2884 def __init__(self):
2885 self.buf = io.BytesIO()
2886 def __del__(self):
2887 io.TextIOWrapper(self.buf, **{kwargs})
2888 print("ok")
2889 c = C()
2890 """.format(iomod=iomod, kwargs=kwargs)
2891 return assert_python_ok("-c", code)
2892
2893 def test_create_at_shutdown_without_encoding(self):
2894 rc, out, err = self._check_create_at_shutdown()
2895 if err:
2896 # Can error out with a RuntimeError if the module state
2897 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002898 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002899 else:
2900 self.assertEqual("ok", out.decode().strip())
2901
2902 def test_create_at_shutdown_with_encoding(self):
2903 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2904 errors='strict')
2905 self.assertFalse(err)
2906 self.assertEqual("ok", out.decode().strip())
2907
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002908 def test_issue22849(self):
2909 class F(object):
2910 def readable(self): return True
2911 def writable(self): return True
2912 def seekable(self): return True
2913
2914 for i in range(10):
2915 try:
2916 self.TextIOWrapper(F(), encoding='utf-8')
2917 except Exception:
2918 pass
2919
2920 F.tell = lambda x: 0
2921 t = self.TextIOWrapper(F(), encoding='utf-8')
2922
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002924class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002925 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002926 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002927
2928 def test_initialization(self):
2929 r = self.BytesIO(b"\xc3\xa9\n\n")
2930 b = self.BufferedReader(r, 1000)
2931 t = self.TextIOWrapper(b)
2932 self.assertRaises(TypeError, t.__init__, b, newline=42)
2933 self.assertRaises(ValueError, t.read)
2934 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2935 self.assertRaises(ValueError, t.read)
2936
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002937 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2938 self.assertRaises(Exception, repr, t)
2939
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002940 def test_garbage_collection(self):
2941 # C TextIOWrapper objects are collected, and collecting them flushes
2942 # all data to disk.
2943 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002944 with support.check_warnings(('', ResourceWarning)):
2945 rawio = io.FileIO(support.TESTFN, "wb")
2946 b = self.BufferedWriter(rawio)
2947 t = self.TextIOWrapper(b, encoding="ascii")
2948 t.write("456def")
2949 t.x = t
2950 wr = weakref.ref(t)
2951 del t
2952 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002953 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002954 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002955 self.assertEqual(f.read(), b"456def")
2956
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002957 def test_rwpair_cleared_before_textio(self):
2958 # Issue 13070: TextIOWrapper's finalization would crash when called
2959 # after the reference to the underlying BufferedRWPair's writer got
2960 # cleared by the GC.
2961 for i in range(1000):
2962 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2963 t1 = self.TextIOWrapper(b1, encoding="ascii")
2964 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2965 t2 = self.TextIOWrapper(b2, encoding="ascii")
2966 # circular references
2967 t1.buddy = t2
2968 t2.buddy = t1
2969 support.gc_collect()
2970
2971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002973 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002974 #shutdown_error = "LookupError: unknown encoding: ascii"
2975 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002976
2977
2978class IncrementalNewlineDecoderTest(unittest.TestCase):
2979
2980 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002981 # UTF-8 specific tests for a newline decoder
2982 def _check_decode(b, s, **kwargs):
2983 # We exercise getstate() / setstate() as well as decode()
2984 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002985 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002986 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002987 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002988
Antoine Pitrou180a3362008-12-14 16:36:46 +00002989 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002990
Antoine Pitrou180a3362008-12-14 16:36:46 +00002991 _check_decode(b'\xe8', "")
2992 _check_decode(b'\xa2', "")
2993 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002994
Antoine Pitrou180a3362008-12-14 16:36:46 +00002995 _check_decode(b'\xe8', "")
2996 _check_decode(b'\xa2', "")
2997 _check_decode(b'\x88', "\u8888")
2998
2999 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003000 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3001
Antoine Pitrou180a3362008-12-14 16:36:46 +00003002 decoder.reset()
3003 _check_decode(b'\n', "\n")
3004 _check_decode(b'\r', "")
3005 _check_decode(b'', "\n", final=True)
3006 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003007
Antoine Pitrou180a3362008-12-14 16:36:46 +00003008 _check_decode(b'\r', "")
3009 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003010
Antoine Pitrou180a3362008-12-14 16:36:46 +00003011 _check_decode(b'\r\r\n', "\n\n")
3012 _check_decode(b'\r', "")
3013 _check_decode(b'\r', "\n")
3014 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003015
Antoine Pitrou180a3362008-12-14 16:36:46 +00003016 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3017 _check_decode(b'\xe8\xa2\x88', "\u8888")
3018 _check_decode(b'\n', "\n")
3019 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3020 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003022 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003023 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003024 if encoding is not None:
3025 encoder = codecs.getincrementalencoder(encoding)()
3026 def _decode_bytewise(s):
3027 # Decode one byte at a time
3028 for b in encoder.encode(s):
3029 result.append(decoder.decode(bytes([b])))
3030 else:
3031 encoder = None
3032 def _decode_bytewise(s):
3033 # Decode one char at a time
3034 for c in s:
3035 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003036 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003037 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003038 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003039 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003040 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003041 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003042 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003043 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003044 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003045 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003046 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003047 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003048 input = "abc"
3049 if encoder is not None:
3050 encoder.reset()
3051 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003052 self.assertEqual(decoder.decode(input), "abc")
3053 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003054
3055 def test_newline_decoder(self):
3056 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003057 # None meaning the IncrementalNewlineDecoder takes unicode input
3058 # rather than bytes input
3059 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003060 'utf-16', 'utf-16-le', 'utf-16-be',
3061 'utf-32', 'utf-32-le', 'utf-32-be',
3062 )
3063 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003064 decoder = enc and codecs.getincrementaldecoder(enc)()
3065 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3066 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003067 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003068 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3069 self.check_newline_decoding_utf8(decoder)
3070
Antoine Pitrou66913e22009-03-06 23:40:56 +00003071 def test_newline_bytes(self):
3072 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3073 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003074 self.assertEqual(dec.newlines, None)
3075 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3076 self.assertEqual(dec.newlines, None)
3077 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3078 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003079 dec = self.IncrementalNewlineDecoder(None, translate=False)
3080 _check(dec)
3081 dec = self.IncrementalNewlineDecoder(None, translate=True)
3082 _check(dec)
3083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3085 pass
3086
3087class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3088 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003089
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003090
Guido van Rossum01a27522007-03-07 01:00:12 +00003091# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003092
Guido van Rossum5abbf752007-08-27 17:39:33 +00003093class MiscIOTest(unittest.TestCase):
3094
Barry Warsaw40e82462008-11-20 20:14:50 +00003095 def tearDown(self):
3096 support.unlink(support.TESTFN)
3097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003098 def test___all__(self):
3099 for name in self.io.__all__:
3100 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003101 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003102 if name == "open":
3103 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003104 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003105 self.assertTrue(issubclass(obj, Exception), name)
3106 elif not name.startswith("SEEK_"):
3107 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003108
Barry Warsaw40e82462008-11-20 20:14:50 +00003109 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003110 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003111 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003112 f.close()
3113
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003114 with support.check_warnings(('', DeprecationWarning)):
3115 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003116 self.assertEqual(f.name, support.TESTFN)
3117 self.assertEqual(f.buffer.name, support.TESTFN)
3118 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3119 self.assertEqual(f.mode, "U")
3120 self.assertEqual(f.buffer.mode, "rb")
3121 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003122 f.close()
3123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003124 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003125 self.assertEqual(f.mode, "w+")
3126 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3127 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003129 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003130 self.assertEqual(g.mode, "wb")
3131 self.assertEqual(g.raw.mode, "wb")
3132 self.assertEqual(g.name, f.fileno())
3133 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003134 f.close()
3135 g.close()
3136
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003137 def test_io_after_close(self):
3138 for kwargs in [
3139 {"mode": "w"},
3140 {"mode": "wb"},
3141 {"mode": "w", "buffering": 1},
3142 {"mode": "w", "buffering": 2},
3143 {"mode": "wb", "buffering": 0},
3144 {"mode": "r"},
3145 {"mode": "rb"},
3146 {"mode": "r", "buffering": 1},
3147 {"mode": "r", "buffering": 2},
3148 {"mode": "rb", "buffering": 0},
3149 {"mode": "w+"},
3150 {"mode": "w+b"},
3151 {"mode": "w+", "buffering": 1},
3152 {"mode": "w+", "buffering": 2},
3153 {"mode": "w+b", "buffering": 0},
3154 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003155 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003156 f.close()
3157 self.assertRaises(ValueError, f.flush)
3158 self.assertRaises(ValueError, f.fileno)
3159 self.assertRaises(ValueError, f.isatty)
3160 self.assertRaises(ValueError, f.__iter__)
3161 if hasattr(f, "peek"):
3162 self.assertRaises(ValueError, f.peek, 1)
3163 self.assertRaises(ValueError, f.read)
3164 if hasattr(f, "read1"):
3165 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003166 if hasattr(f, "readall"):
3167 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003168 if hasattr(f, "readinto"):
3169 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3170 self.assertRaises(ValueError, f.readline)
3171 self.assertRaises(ValueError, f.readlines)
3172 self.assertRaises(ValueError, f.seek, 0)
3173 self.assertRaises(ValueError, f.tell)
3174 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175 self.assertRaises(ValueError, f.write,
3176 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003177 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003178 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003180 def test_blockingioerror(self):
3181 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003182 class C(str):
3183 pass
3184 c = C("")
3185 b = self.BlockingIOError(1, c)
3186 c.b = b
3187 b.c = c
3188 wr = weakref.ref(c)
3189 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003190 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003191 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003192
3193 def test_abcs(self):
3194 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003195 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3196 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3197 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3198 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003199
3200 def _check_abc_inheritance(self, abcmodule):
3201 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003202 self.assertIsInstance(f, abcmodule.IOBase)
3203 self.assertIsInstance(f, abcmodule.RawIOBase)
3204 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3205 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003206 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003207 self.assertIsInstance(f, abcmodule.IOBase)
3208 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3209 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3210 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003211 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003212 self.assertIsInstance(f, abcmodule.IOBase)
3213 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3214 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3215 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003216
3217 def test_abc_inheritance(self):
3218 # Test implementations inherit from their respective ABCs
3219 self._check_abc_inheritance(self)
3220
3221 def test_abc_inheritance_official(self):
3222 # Test implementations inherit from the official ABCs of the
3223 # baseline "io" module.
3224 self._check_abc_inheritance(io)
3225
Antoine Pitroue033e062010-10-29 10:38:18 +00003226 def _check_warn_on_dealloc(self, *args, **kwargs):
3227 f = open(*args, **kwargs)
3228 r = repr(f)
3229 with self.assertWarns(ResourceWarning) as cm:
3230 f = None
3231 support.gc_collect()
3232 self.assertIn(r, str(cm.warning.args[0]))
3233
3234 def test_warn_on_dealloc(self):
3235 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3236 self._check_warn_on_dealloc(support.TESTFN, "wb")
3237 self._check_warn_on_dealloc(support.TESTFN, "w")
3238
3239 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3240 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003241 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003242 for fd in fds:
3243 try:
3244 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003245 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003246 if e.errno != errno.EBADF:
3247 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003248 self.addCleanup(cleanup_fds)
3249 r, w = os.pipe()
3250 fds += r, w
3251 self._check_warn_on_dealloc(r, *args, **kwargs)
3252 # When using closefd=False, there's no warning
3253 r, w = os.pipe()
3254 fds += r, w
3255 with warnings.catch_warnings(record=True) as recorded:
3256 open(r, *args, closefd=False, **kwargs)
3257 support.gc_collect()
3258 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003259
3260 def test_warn_on_dealloc_fd(self):
3261 self._check_warn_on_dealloc_fd("rb", buffering=0)
3262 self._check_warn_on_dealloc_fd("rb")
3263 self._check_warn_on_dealloc_fd("r")
3264
3265
Antoine Pitrou243757e2010-11-05 21:15:39 +00003266 def test_pickling(self):
3267 # Pickling file objects is forbidden
3268 for kwargs in [
3269 {"mode": "w"},
3270 {"mode": "wb"},
3271 {"mode": "wb", "buffering": 0},
3272 {"mode": "r"},
3273 {"mode": "rb"},
3274 {"mode": "rb", "buffering": 0},
3275 {"mode": "w+"},
3276 {"mode": "w+b"},
3277 {"mode": "w+b", "buffering": 0},
3278 ]:
3279 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3280 with self.open(support.TESTFN, **kwargs) as f:
3281 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3282
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003283 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3284 def test_nonblock_pipe_write_bigbuf(self):
3285 self._test_nonblock_pipe_write(16*1024)
3286
3287 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3288 def test_nonblock_pipe_write_smallbuf(self):
3289 self._test_nonblock_pipe_write(1024)
3290
3291 def _set_non_blocking(self, fd):
3292 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3293 self.assertNotEqual(flags, -1)
3294 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3295 self.assertEqual(res, 0)
3296
3297 def _test_nonblock_pipe_write(self, bufsize):
3298 sent = []
3299 received = []
3300 r, w = os.pipe()
3301 self._set_non_blocking(r)
3302 self._set_non_blocking(w)
3303
3304 # To exercise all code paths in the C implementation we need
3305 # to play with buffer sizes. For instance, if we choose a
3306 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3307 # then we will never get a partial write of the buffer.
3308 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3309 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3310
3311 with rf, wf:
3312 for N in 9999, 73, 7574:
3313 try:
3314 i = 0
3315 while True:
3316 msg = bytes([i % 26 + 97]) * N
3317 sent.append(msg)
3318 wf.write(msg)
3319 i += 1
3320
3321 except self.BlockingIOError as e:
3322 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003323 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003324 sent[-1] = sent[-1][:e.characters_written]
3325 received.append(rf.read())
3326 msg = b'BLOCKED'
3327 wf.write(msg)
3328 sent.append(msg)
3329
3330 while True:
3331 try:
3332 wf.flush()
3333 break
3334 except self.BlockingIOError as e:
3335 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003336 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003337 self.assertEqual(e.characters_written, 0)
3338 received.append(rf.read())
3339
3340 received += iter(rf.read, None)
3341
3342 sent, received = b''.join(sent), b''.join(received)
3343 self.assertTrue(sent == received)
3344 self.assertTrue(wf.closed)
3345 self.assertTrue(rf.closed)
3346
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003347 def test_create_fail(self):
3348 # 'x' mode fails if file is existing
3349 with self.open(support.TESTFN, 'w'):
3350 pass
3351 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3352
3353 def test_create_writes(self):
3354 # 'x' mode opens for writing
3355 with self.open(support.TESTFN, 'xb') as f:
3356 f.write(b"spam")
3357 with self.open(support.TESTFN, 'rb') as f:
3358 self.assertEqual(b"spam", f.read())
3359
Christian Heimes7b648752012-09-10 14:48:43 +02003360 def test_open_allargs(self):
3361 # there used to be a buffer overflow in the parser for rawmode
3362 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3363
3364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003365class CMiscIOTest(MiscIOTest):
3366 io = io
3367
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003368 def test_readinto_buffer_overflow(self):
3369 # Issue #18025
3370 class BadReader(self.io.BufferedIOBase):
3371 def read(self, n=-1):
3372 return b'x' * 10**6
3373 bufio = BadReader()
3374 b = bytearray(2)
3375 self.assertRaises(ValueError, bufio.readinto, b)
3376
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003377 @unittest.skipUnless(threading, 'Threading required for this test.')
3378 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3379 # Issue #23309: deadlocks at shutdown should be avoided when a
3380 # daemon thread and the main thread both write to a file.
3381 code = """if 1:
3382 import sys
3383 import time
3384 import threading
3385
3386 file = sys.{stream_name}
3387
3388 def run():
3389 while True:
3390 file.write('.')
3391 file.flush()
3392
3393 thread = threading.Thread(target=run)
3394 thread.daemon = True
3395 thread.start()
3396
3397 time.sleep(0.5)
3398 file.write('!')
3399 file.flush()
3400 """.format_map(locals())
3401 res, _ = run_python_until_end("-c", code)
3402 err = res.err.decode()
3403 if res.rc != 0:
3404 # Failure: should be a fatal error
3405 self.assertIn("Fatal Python error: could not acquire lock "
3406 "for <_io.BufferedWriter name='<{stream_name}>'> "
3407 "at interpreter shutdown, possibly due to "
3408 "daemon threads".format_map(locals()),
3409 err)
3410 else:
3411 self.assertFalse(err.strip('.!'))
3412
3413 def test_daemon_threads_shutdown_stdout_deadlock(self):
3414 self.check_daemon_threads_shutdown_deadlock('stdout')
3415
3416 def test_daemon_threads_shutdown_stderr_deadlock(self):
3417 self.check_daemon_threads_shutdown_deadlock('stderr')
3418
3419
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003420class PyMiscIOTest(MiscIOTest):
3421 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003422
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003423
3424@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3425class SignalsTest(unittest.TestCase):
3426
3427 def setUp(self):
3428 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3429
3430 def tearDown(self):
3431 signal.signal(signal.SIGALRM, self.oldalrm)
3432
3433 def alarm_interrupt(self, sig, frame):
3434 1/0
3435
3436 @unittest.skipUnless(threading, 'Threading required for this test.')
3437 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3438 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003439 invokes the signal handler, and bubbles up the exception raised
3440 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003441 read_results = []
3442 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003443 if hasattr(signal, 'pthread_sigmask'):
3444 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003445 s = os.read(r, 1)
3446 read_results.append(s)
3447 t = threading.Thread(target=_read)
3448 t.daemon = True
3449 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003450 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003451 try:
3452 wio = self.io.open(w, **fdopen_kwargs)
3453 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003454 # Fill the pipe enough that the write will be blocking.
3455 # It will be interrupted by the timer armed above. Since the
3456 # other thread has read one byte, the low-level write will
3457 # return with a successful (partial) result rather than an EINTR.
3458 # The buffered IO layer must check for pending signal
3459 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003460 signal.alarm(1)
3461 try:
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003462 with self.assertRaises(ZeroDivisionError):
3463 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
Victor Stinner775b2dd2013-07-15 19:53:13 +02003464 finally:
3465 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003466 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003467 # We got one byte, get another one and check that it isn't a
3468 # repeat of the first one.
3469 read_results.append(os.read(r, 1))
3470 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3471 finally:
3472 os.close(w)
3473 os.close(r)
3474 # This is deliberate. If we didn't close the file descriptor
3475 # before closing wio, wio would try to flush its internal
3476 # buffer, and block again.
3477 try:
3478 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003479 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003480 if e.errno != errno.EBADF:
3481 raise
3482
3483 def test_interrupted_write_unbuffered(self):
3484 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3485
3486 def test_interrupted_write_buffered(self):
3487 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3488
Victor Stinner6ab72862014-09-03 23:32:28 +02003489 # Issue #22331: The test hangs on FreeBSD 7.2
3490 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003491 def test_interrupted_write_text(self):
3492 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3493
Brett Cannon31f59292011-02-21 19:29:56 +00003494 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003495 def check_reentrant_write(self, data, **fdopen_kwargs):
3496 def on_alarm(*args):
3497 # Will be called reentrantly from the same thread
3498 wio.write(data)
3499 1/0
3500 signal.signal(signal.SIGALRM, on_alarm)
3501 r, w = os.pipe()
3502 wio = self.io.open(w, **fdopen_kwargs)
3503 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003504 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003505 # Either the reentrant call to wio.write() fails with RuntimeError,
3506 # or the signal handler raises ZeroDivisionError.
3507 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3508 while 1:
3509 for i in range(100):
3510 wio.write(data)
3511 wio.flush()
3512 # Make sure the buffer doesn't fill up and block further writes
3513 os.read(r, len(data) * 100)
3514 exc = cm.exception
3515 if isinstance(exc, RuntimeError):
3516 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3517 finally:
3518 wio.close()
3519 os.close(r)
3520
3521 def test_reentrant_write_buffered(self):
3522 self.check_reentrant_write(b"xy", mode="wb")
3523
3524 def test_reentrant_write_text(self):
3525 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3526
Antoine Pitrou707ce822011-02-25 21:24:11 +00003527 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3528 """Check that a buffered read, when it gets interrupted (either
3529 returning a partial result or EINTR), properly invokes the signal
3530 handler and retries if the latter returned successfully."""
3531 r, w = os.pipe()
3532 fdopen_kwargs["closefd"] = False
3533 def alarm_handler(sig, frame):
3534 os.write(w, b"bar")
3535 signal.signal(signal.SIGALRM, alarm_handler)
3536 try:
3537 rio = self.io.open(r, **fdopen_kwargs)
3538 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003539 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003540 # Expected behaviour:
3541 # - first raw read() returns partial b"foo"
3542 # - second raw read() returns EINTR
3543 # - third raw read() returns b"bar"
3544 self.assertEqual(decode(rio.read(6)), "foobar")
3545 finally:
3546 rio.close()
3547 os.close(w)
3548 os.close(r)
3549
Antoine Pitrou20db5112011-08-19 20:32:34 +02003550 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003551 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3552 mode="rb")
3553
Antoine Pitrou20db5112011-08-19 20:32:34 +02003554 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003555 self.check_interrupted_read_retry(lambda x: x,
3556 mode="r")
3557
3558 @unittest.skipUnless(threading, 'Threading required for this test.')
3559 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3560 """Check that a buffered write, when it gets interrupted (either
3561 returning a partial result or EINTR), properly invokes the signal
3562 handler and retries if the latter returned successfully."""
3563 select = support.import_module("select")
3564 # A quantity that exceeds the buffer size of an anonymous pipe's
3565 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003566 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003567 r, w = os.pipe()
3568 fdopen_kwargs["closefd"] = False
3569 # We need a separate thread to read from the pipe and allow the
3570 # write() to finish. This thread is started after the SIGALRM is
3571 # received (forcing a first EINTR in write()).
3572 read_results = []
3573 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003574 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003575 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003576 try:
3577 while not write_finished:
3578 while r in select.select([r], [], [], 1.0)[0]:
3579 s = os.read(r, 1024)
3580 read_results.append(s)
3581 except BaseException as exc:
3582 nonlocal error
3583 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003584 t = threading.Thread(target=_read)
3585 t.daemon = True
3586 def alarm1(sig, frame):
3587 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003588 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003589 def alarm2(sig, frame):
3590 t.start()
3591 signal.signal(signal.SIGALRM, alarm1)
3592 try:
3593 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003594 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003595 # Expected behaviour:
3596 # - first raw write() is partial (because of the limited pipe buffer
3597 # and the first alarm)
3598 # - second raw write() returns EINTR (because of the second alarm)
3599 # - subsequent write()s are successful (either partial or complete)
3600 self.assertEqual(N, wio.write(item * N))
3601 wio.flush()
3602 write_finished = True
3603 t.join()
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003604
3605 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003606 self.assertEqual(N, sum(len(x) for x in read_results))
3607 finally:
3608 write_finished = True
3609 os.close(w)
3610 os.close(r)
3611 # This is deliberate. If we didn't close the file descriptor
3612 # before closing wio, wio would try to flush its internal
3613 # buffer, and could block (in case of failure).
3614 try:
3615 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003616 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003617 if e.errno != errno.EBADF:
3618 raise
3619
Antoine Pitrou20db5112011-08-19 20:32:34 +02003620 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003621 self.check_interrupted_write_retry(b"x", mode="wb")
3622
Antoine Pitrou20db5112011-08-19 20:32:34 +02003623 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003624 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3625
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003626
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003627class CSignalsTest(SignalsTest):
3628 io = io
3629
3630class PySignalsTest(SignalsTest):
3631 io = pyio
3632
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003633 # Handling reentrancy issues would slow down _pyio even more, so the
3634 # tests are disabled.
3635 test_reentrant_write_buffered = None
3636 test_reentrant_write_text = None
3637
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003638
Ezio Melottidaa42c72013-03-23 16:30:16 +02003639def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003640 tests = (CIOTest, PyIOTest,
3641 CBufferedReaderTest, PyBufferedReaderTest,
3642 CBufferedWriterTest, PyBufferedWriterTest,
3643 CBufferedRWPairTest, PyBufferedRWPairTest,
3644 CBufferedRandomTest, PyBufferedRandomTest,
3645 StatefulIncrementalDecoderTest,
3646 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3647 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003648 CMiscIOTest, PyMiscIOTest,
3649 CSignalsTest, PySignalsTest,
3650 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003651
3652 # Put the namespaces of the IO module we are testing and some useful mock
3653 # classes in the __dict__ of each test.
3654 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003655 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003656 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3657 c_io_ns = {name : getattr(io, name) for name in all_members}
3658 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3659 globs = globals()
3660 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3661 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3662 # Avoid turning open into a bound method.
3663 py_io_ns["open"] = pyio.OpenWrapper
3664 for test in tests:
3665 if test.__name__.startswith("C"):
3666 for name, obj in c_io_ns.items():
3667 setattr(test, name, obj)
3668 elif test.__name__.startswith("Py"):
3669 for name, obj in py_io_ns.items():
3670 setattr(test, name, obj)
3671
Ezio Melottidaa42c72013-03-23 16:30:16 +02003672 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3673 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003674
3675if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003676 unittest.main()