blob: ea109acaa0c5c3d95b97c617912733f74ebb23c4 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou25f85d42015-04-13 19:41:47 +020038from test.script_helper import assert_python_ok, run_python_until_end
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200596 def check_flush_error_on_close(self, *args, **kwargs):
597 # Test that the file is closed despite failed flush
598 # and that flush() is called before file closed.
599 f = self.open(*args, **kwargs)
600 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200602 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200603 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000604 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200605 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600606 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200607 self.assertTrue(closed) # flush() called
608 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200609 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200610
611 def test_flush_error_on_close(self):
612 # raw file
613 # Issue #5700: io.FileIO calls flush() after file closed
614 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
615 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
616 self.check_flush_error_on_close(fd, 'wb', buffering=0)
617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
619 os.close(fd)
620 # buffered io
621 self.check_flush_error_on_close(support.TESTFN, 'wb')
622 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
623 self.check_flush_error_on_close(fd, 'wb')
624 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
625 self.check_flush_error_on_close(fd, 'wb', closefd=False)
626 os.close(fd)
627 # text io
628 self.check_flush_error_on_close(support.TESTFN, 'w')
629 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
630 self.check_flush_error_on_close(fd, 'w')
631 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
632 self.check_flush_error_on_close(fd, 'w', closefd=False)
633 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000634
635 def test_multi_close(self):
636 f = self.open(support.TESTFN, "wb", buffering=0)
637 f.close()
638 f.close()
639 f.close()
640 self.assertRaises(ValueError, f.flush)
641
Antoine Pitrou328ec742010-09-14 18:37:24 +0000642 def test_RawIOBase_read(self):
643 # Exercise the default RawIOBase.read() implementation (which calls
644 # readinto() internally).
645 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
646 self.assertEqual(rawio.read(2), b"ab")
647 self.assertEqual(rawio.read(2), b"c")
648 self.assertEqual(rawio.read(2), b"d")
649 self.assertEqual(rawio.read(2), None)
650 self.assertEqual(rawio.read(2), b"ef")
651 self.assertEqual(rawio.read(2), b"g")
652 self.assertEqual(rawio.read(2), None)
653 self.assertEqual(rawio.read(2), b"")
654
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400655 def test_types_have_dict(self):
656 test = (
657 self.IOBase(),
658 self.RawIOBase(),
659 self.TextIOBase(),
660 self.StringIO(),
661 self.BytesIO()
662 )
663 for obj in test:
664 self.assertTrue(hasattr(obj, "__dict__"))
665
Ross Lagerwall59142db2011-10-31 20:34:46 +0200666 def test_opener(self):
667 with self.open(support.TESTFN, "w") as f:
668 f.write("egg\n")
669 fd = os.open(support.TESTFN, os.O_RDONLY)
670 def opener(path, flags):
671 return fd
672 with self.open("non-existent", "r", opener=opener) as f:
673 self.assertEqual(f.read(), "egg\n")
674
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200675 def test_fileio_closefd(self):
676 # Issue #4841
677 with self.open(__file__, 'rb') as f1, \
678 self.open(__file__, 'rb') as f2:
679 fileio = self.FileIO(f1.fileno(), closefd=False)
680 # .__init__() must not close f1
681 fileio.__init__(f2.fileno(), closefd=False)
682 f1.readline()
683 # .close() must not close f2
684 fileio.close()
685 f2.readline()
686
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300687 def test_nonbuffered_textio(self):
688 with warnings.catch_warnings(record=True) as recorded:
689 with self.assertRaises(ValueError):
690 self.open(support.TESTFN, 'w', buffering=0)
691 support.gc_collect()
692 self.assertEqual(recorded, [])
693
694 def test_invalid_newline(self):
695 with warnings.catch_warnings(record=True) as recorded:
696 with self.assertRaises(ValueError):
697 self.open(support.TESTFN, 'w', newline='invalid')
698 support.gc_collect()
699 self.assertEqual(recorded, [])
700
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200703
704 def test_IOBase_finalize(self):
705 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
706 # class which inherits IOBase and an object of this class are caught
707 # in a reference cycle and close() is already in the method cache.
708 class MyIO(self.IOBase):
709 def close(self):
710 pass
711
712 # create an instance to populate the method cache
713 MyIO()
714 obj = MyIO()
715 obj.obj = obj
716 wr = weakref.ref(obj)
717 del MyIO
718 del obj
719 support.gc_collect()
720 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722class PyIOTest(IOTest):
723 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000724
Guido van Rossuma9e20242007-03-08 00:43:48 +0000725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726class CommonBufferedTests:
727 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
728
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000729 def test_detach(self):
730 raw = self.MockRawIO()
731 buf = self.tp(raw)
732 self.assertIs(buf.detach(), raw)
733 self.assertRaises(ValueError, buf.detach)
734
Benjamin Peterson10e76b62014-12-21 20:51:50 -0600735 repr(buf) # Should still work
736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 def test_fileno(self):
738 rawio = self.MockRawIO()
739 bufio = self.tp(rawio)
740
Ezio Melottib3aedd42010-11-20 19:04:17 +0000741 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000742
Zachary Ware9fe6d862013-12-08 00:20:35 -0600743 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_no_fileno(self):
745 # XXX will we always have fileno() function? If so, kill
746 # this test. Else, write it.
747 pass
748
749 def test_invalid_args(self):
750 rawio = self.MockRawIO()
751 bufio = self.tp(rawio)
752 # Invalid whence
753 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200754 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755
756 def test_override_destructor(self):
757 tp = self.tp
758 record = []
759 class MyBufferedIO(tp):
760 def __del__(self):
761 record.append(1)
762 try:
763 f = super().__del__
764 except AttributeError:
765 pass
766 else:
767 f()
768 def close(self):
769 record.append(2)
770 super().close()
771 def flush(self):
772 record.append(3)
773 super().flush()
774 rawio = self.MockRawIO()
775 bufio = MyBufferedIO(rawio)
776 writable = bufio.writable()
777 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000778 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 if writable:
780 self.assertEqual(record, [1, 2, 3])
781 else:
782 self.assertEqual(record, [1, 2])
783
784 def test_context_manager(self):
785 # Test usability as a context manager
786 rawio = self.MockRawIO()
787 bufio = self.tp(rawio)
788 def _with():
789 with bufio:
790 pass
791 _with()
792 # bufio should now be closed, and using it a second time should raise
793 # a ValueError.
794 self.assertRaises(ValueError, _with)
795
796 def test_error_through_destructor(self):
797 # Test that the exception state is not modified by a destructor,
798 # even if close() fails.
799 rawio = self.CloseFailureIO()
800 def f():
801 self.tp(rawio).xyzzy
802 with support.captured_output("stderr") as s:
803 self.assertRaises(AttributeError, f)
804 s = s.getvalue().strip()
805 if s:
806 # The destructor *may* have printed an unraisable error, check it
807 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200808 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000809 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000810
Antoine Pitrou716c4442009-05-23 19:04:03 +0000811 def test_repr(self):
812 raw = self.MockRawIO()
813 b = self.tp(raw)
814 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
815 self.assertEqual(repr(b), "<%s>" % clsname)
816 raw.name = "dummy"
817 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
818 raw.name = b"dummy"
819 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
820
Antoine Pitrou6be88762010-05-03 16:48:20 +0000821 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200822 # Test that buffered file is closed despite failed flush
823 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +0000824 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200825 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200827 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200828 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000829 raw.flush = bad_flush
830 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200831 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600832 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200833 self.assertTrue(raw.closed)
834 self.assertTrue(closed) # flush() called
835 self.assertFalse(closed[0]) # flush() called before file closed
836 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200837 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -0600838
839 def test_close_error_on_close(self):
840 raw = self.MockRawIO()
841 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200842 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600843 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200844 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600845 raw.close = bad_close
846 b = self.tp(raw)
847 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200848 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600849 b.close()
850 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300851 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600852 self.assertEqual(err.exception.__context__.args, ('flush',))
853 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000854
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300855 def test_nonnormalized_close_error_on_close(self):
856 # Issue #21677
857 raw = self.MockRawIO()
858 def bad_flush():
859 raise non_existing_flush
860 def bad_close():
861 raise non_existing_close
862 raw.close = bad_close
863 b = self.tp(raw)
864 b.flush = bad_flush
865 with self.assertRaises(NameError) as err: # exception not swallowed
866 b.close()
867 self.assertIn('non_existing_close', str(err.exception))
868 self.assertIsInstance(err.exception.__context__, NameError)
869 self.assertIn('non_existing_flush', str(err.exception.__context__))
870 self.assertFalse(b.closed)
871
Antoine Pitrou6be88762010-05-03 16:48:20 +0000872 def test_multi_close(self):
873 raw = self.MockRawIO()
874 b = self.tp(raw)
875 b.close()
876 b.close()
877 b.close()
878 self.assertRaises(ValueError, b.flush)
879
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000880 def test_unseekable(self):
881 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
882 self.assertRaises(self.UnsupportedOperation, bufio.tell)
883 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
884
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000885 def test_readonly_attributes(self):
886 raw = self.MockRawIO()
887 buf = self.tp(raw)
888 x = self.MockRawIO()
889 with self.assertRaises(AttributeError):
890 buf.raw = x
891
Guido van Rossum78892e42007-04-06 17:31:18 +0000892
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200893class SizeofTest:
894
895 @support.cpython_only
896 def test_sizeof(self):
897 bufsize1 = 4096
898 bufsize2 = 8192
899 rawio = self.MockRawIO()
900 bufio = self.tp(rawio, buffer_size=bufsize1)
901 size = sys.getsizeof(bufio) - bufsize1
902 rawio = self.MockRawIO()
903 bufio = self.tp(rawio, buffer_size=bufsize2)
904 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
905
Jesus Ceadc469452012-10-04 12:37:56 +0200906 @support.cpython_only
907 def test_buffer_freeing(self) :
908 bufsize = 4096
909 rawio = self.MockRawIO()
910 bufio = self.tp(rawio, buffer_size=bufsize)
911 size = sys.getsizeof(bufio) - bufsize
912 bufio.close()
913 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
916 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 def test_constructor(self):
919 rawio = self.MockRawIO([b"abc"])
920 bufio = self.tp(rawio)
921 bufio.__init__(rawio)
922 bufio.__init__(rawio, buffer_size=1024)
923 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000924 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
927 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
928 rawio = self.MockRawIO([b"abc"])
929 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000930 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000931
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200932 def test_uninitialized(self):
933 bufio = self.tp.__new__(self.tp)
934 del bufio
935 bufio = self.tp.__new__(self.tp)
936 self.assertRaisesRegex((ValueError, AttributeError),
937 'uninitialized|has no attribute',
938 bufio.read, 0)
939 bufio.__init__(self.MockRawIO())
940 self.assertEqual(bufio.read(0), b'')
941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000943 for arg in (None, 7):
944 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
945 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000947 # Invalid args
948 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 def test_read1(self):
951 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"a", bufio.read(1))
954 self.assertEqual(b"b", bufio.read1(1))
955 self.assertEqual(rawio._reads, 1)
956 self.assertEqual(b"c", bufio.read1(100))
957 self.assertEqual(rawio._reads, 1)
958 self.assertEqual(b"d", bufio.read1(100))
959 self.assertEqual(rawio._reads, 2)
960 self.assertEqual(b"efg", bufio.read1(100))
961 self.assertEqual(rawio._reads, 3)
962 self.assertEqual(b"", bufio.read1(100))
963 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 # Invalid args
965 self.assertRaises(ValueError, bufio.read1, -1)
966
967 def test_readinto(self):
968 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
969 bufio = self.tp(rawio)
970 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000971 self.assertEqual(bufio.readinto(b), 2)
972 self.assertEqual(b, b"ab")
973 self.assertEqual(bufio.readinto(b), 2)
974 self.assertEqual(b, b"cd")
975 self.assertEqual(bufio.readinto(b), 2)
976 self.assertEqual(b, b"ef")
977 self.assertEqual(bufio.readinto(b), 1)
978 self.assertEqual(b, b"gf")
979 self.assertEqual(bufio.readinto(b), 0)
980 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200981 rawio = self.MockRawIO((b"abc", None))
982 bufio = self.tp(rawio)
983 self.assertEqual(bufio.readinto(b), 2)
984 self.assertEqual(b, b"ab")
985 self.assertEqual(bufio.readinto(b), 1)
986 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000988 def test_readlines(self):
989 def bufio():
990 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
991 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000992 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
993 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
994 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000997 data = b"abcdefghi"
998 dlen = len(data)
999
1000 tests = [
1001 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1002 [ 100, [ 3, 3, 3], [ dlen ] ],
1003 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1004 ]
1005
1006 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 rawio = self.MockFileIO(data)
1008 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001009 pos = 0
1010 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001011 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001012 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001014 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001017 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1019 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001020 self.assertEqual(b"abcd", bufio.read(6))
1021 self.assertEqual(b"e", bufio.read(1))
1022 self.assertEqual(b"fg", bufio.read())
1023 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001024 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001025 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001026
Victor Stinnera80987f2011-05-25 22:47:16 +02001027 rawio = self.MockRawIO((b"a", None, None))
1028 self.assertEqual(b"a", rawio.readall())
1029 self.assertIsNone(rawio.readall())
1030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_read_past_eof(self):
1032 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1033 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001034
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_read_all(self):
1038 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1039 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001040
Ezio Melottib3aedd42010-11-20 19:04:17 +00001041 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001042
Victor Stinner45df8202010-04-28 22:31:17 +00001043 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001044 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001046 try:
1047 # Write out many bytes with exactly the same number of 0's,
1048 # 1's... 255's. This will help us check that concurrent reading
1049 # doesn't duplicate or forget contents.
1050 N = 1000
1051 l = list(range(256)) * N
1052 random.shuffle(l)
1053 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001054 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001055 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001056 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001058 errors = []
1059 results = []
1060 def f():
1061 try:
1062 # Intra-buffer read then buffer-flushing read
1063 for n in cycle([1, 19]):
1064 s = bufio.read(n)
1065 if not s:
1066 break
1067 # list.append() is atomic
1068 results.append(s)
1069 except Exception as e:
1070 errors.append(e)
1071 raise
1072 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001073 with support.start_threads(threads):
1074 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001075 self.assertFalse(errors,
1076 "the following exceptions were caught: %r" % errors)
1077 s = b''.join(results)
1078 for i in range(256):
1079 c = bytes(bytearray([i]))
1080 self.assertEqual(s.count(c), N)
1081 finally:
1082 support.unlink(support.TESTFN)
1083
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001084 def test_unseekable(self):
1085 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1086 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1087 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1088 bufio.read(1)
1089 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1090 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def test_misbehaved_io(self):
1093 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1094 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001095 self.assertRaises(OSError, bufio.seek, 0)
1096 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001098 def test_no_extraneous_read(self):
1099 # Issue #9550; when the raw IO object has satisfied the read request,
1100 # we should not issue any additional reads, otherwise it may block
1101 # (e.g. socket).
1102 bufsize = 16
1103 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1104 rawio = self.MockRawIO([b"x" * n])
1105 bufio = self.tp(rawio, bufsize)
1106 self.assertEqual(bufio.read(n), b"x" * n)
1107 # Simple case: one raw read is enough to satisfy the request.
1108 self.assertEqual(rawio._extraneous_reads, 0,
1109 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1110 # A more complex case where two raw reads are needed to satisfy
1111 # the request.
1112 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1113 bufio = self.tp(rawio, bufsize)
1114 self.assertEqual(bufio.read(n), b"x" * n)
1115 self.assertEqual(rawio._extraneous_reads, 0,
1116 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1117
1118
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001119class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 tp = io.BufferedReader
1121
1122 def test_constructor(self):
1123 BufferedReaderTest.test_constructor(self)
1124 # The allocation can succeed on 32-bit builds, e.g. with more
1125 # than 2GB RAM and a 64-bit kernel.
1126 if sys.maxsize > 0x7FFFFFFF:
1127 rawio = self.MockRawIO()
1128 bufio = self.tp(rawio)
1129 self.assertRaises((OverflowError, MemoryError, ValueError),
1130 bufio.__init__, rawio, sys.maxsize)
1131
1132 def test_initialization(self):
1133 rawio = self.MockRawIO([b"abc"])
1134 bufio = self.tp(rawio)
1135 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1136 self.assertRaises(ValueError, bufio.read)
1137 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1138 self.assertRaises(ValueError, bufio.read)
1139 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1140 self.assertRaises(ValueError, bufio.read)
1141
1142 def test_misbehaved_io_read(self):
1143 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1144 bufio = self.tp(rawio)
1145 # _pyio.BufferedReader seems to implement reading different, so that
1146 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001147 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148
1149 def test_garbage_collection(self):
1150 # C BufferedReader objects are collected.
1151 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001152 with support.check_warnings(('', ResourceWarning)):
1153 rawio = self.FileIO(support.TESTFN, "w+b")
1154 f = self.tp(rawio)
1155 f.f = f
1156 wr = weakref.ref(f)
1157 del f
1158 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001159 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160
R David Murray67bfe802013-02-23 21:51:05 -05001161 def test_args_error(self):
1162 # Issue #17275
1163 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1164 self.tp(io.BytesIO(), 1024, 1024, 1024)
1165
1166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167class PyBufferedReaderTest(BufferedReaderTest):
1168 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001169
Guido van Rossuma9e20242007-03-08 00:43:48 +00001170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1172 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174 def test_constructor(self):
1175 rawio = self.MockRawIO()
1176 bufio = self.tp(rawio)
1177 bufio.__init__(rawio)
1178 bufio.__init__(rawio, buffer_size=1024)
1179 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001180 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001181 bufio.flush()
1182 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1183 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1184 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1185 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001188 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001190 def test_uninitialized(self):
1191 bufio = self.tp.__new__(self.tp)
1192 del bufio
1193 bufio = self.tp.__new__(self.tp)
1194 self.assertRaisesRegex((ValueError, AttributeError),
1195 'uninitialized|has no attribute',
1196 bufio.write, b'')
1197 bufio.__init__(self.MockRawIO())
1198 self.assertEqual(bufio.write(b''), 0)
1199
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001200 def test_detach_flush(self):
1201 raw = self.MockRawIO()
1202 buf = self.tp(raw)
1203 buf.write(b"howdy!")
1204 self.assertFalse(raw._write_stack)
1205 buf.detach()
1206 self.assertEqual(raw._write_stack, [b"howdy!"])
1207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001208 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001209 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 writer = self.MockRawIO()
1211 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001212 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001213 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 def test_write_overflow(self):
1216 writer = self.MockRawIO()
1217 bufio = self.tp(writer, 8)
1218 contents = b"abcdefghijklmnop"
1219 for n in range(0, len(contents), 3):
1220 bufio.write(contents[n:n+3])
1221 flushed = b"".join(writer._write_stack)
1222 # At least (total - 8) bytes were implicitly flushed, perhaps more
1223 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001224 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226 def check_writes(self, intermediate_func):
1227 # Lots of writes, test the flushed output is as expected.
1228 contents = bytes(range(256)) * 1000
1229 n = 0
1230 writer = self.MockRawIO()
1231 bufio = self.tp(writer, 13)
1232 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1233 def gen_sizes():
1234 for size in count(1):
1235 for i in range(15):
1236 yield size
1237 sizes = gen_sizes()
1238 while n < len(contents):
1239 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001240 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 intermediate_func(bufio)
1242 n += size
1243 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001244 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 def test_writes(self):
1247 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001248
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001249 def test_writes_and_flushes(self):
1250 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001251
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252 def test_writes_and_seeks(self):
1253 def _seekabs(bufio):
1254 pos = bufio.tell()
1255 bufio.seek(pos + 1, 0)
1256 bufio.seek(pos - 1, 0)
1257 bufio.seek(pos, 0)
1258 self.check_writes(_seekabs)
1259 def _seekrel(bufio):
1260 pos = bufio.seek(0, 1)
1261 bufio.seek(+1, 1)
1262 bufio.seek(-1, 1)
1263 bufio.seek(pos, 0)
1264 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001265
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001266 def test_writes_and_truncates(self):
1267 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001269 def test_write_non_blocking(self):
1270 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001271 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001272
Ezio Melottib3aedd42010-11-20 19:04:17 +00001273 self.assertEqual(bufio.write(b"abcd"), 4)
1274 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275 # 1 byte will be written, the rest will be buffered
1276 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001277 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1280 raw.block_on(b"0")
1281 try:
1282 bufio.write(b"opqrwxyz0123456789")
1283 except self.BlockingIOError as e:
1284 written = e.characters_written
1285 else:
1286 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001287 self.assertEqual(written, 16)
1288 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001290
Ezio Melottib3aedd42010-11-20 19:04:17 +00001291 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001292 s = raw.pop_written()
1293 # Previously buffered bytes were flushed
1294 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001295
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001296 def test_write_and_rewind(self):
1297 raw = io.BytesIO()
1298 bufio = self.tp(raw, 4)
1299 self.assertEqual(bufio.write(b"abcdef"), 6)
1300 self.assertEqual(bufio.tell(), 6)
1301 bufio.seek(0, 0)
1302 self.assertEqual(bufio.write(b"XY"), 2)
1303 bufio.seek(6, 0)
1304 self.assertEqual(raw.getvalue(), b"XYcdef")
1305 self.assertEqual(bufio.write(b"123456"), 6)
1306 bufio.flush()
1307 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001308
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001309 def test_flush(self):
1310 writer = self.MockRawIO()
1311 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001312 bufio.write(b"abc")
1313 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001314 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001315
Antoine Pitrou131a4892012-10-16 22:57:11 +02001316 def test_writelines(self):
1317 l = [b'ab', b'cd', b'ef']
1318 writer = self.MockRawIO()
1319 bufio = self.tp(writer, 8)
1320 bufio.writelines(l)
1321 bufio.flush()
1322 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1323
1324 def test_writelines_userlist(self):
1325 l = UserList([b'ab', b'cd', b'ef'])
1326 writer = self.MockRawIO()
1327 bufio = self.tp(writer, 8)
1328 bufio.writelines(l)
1329 bufio.flush()
1330 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1331
1332 def test_writelines_error(self):
1333 writer = self.MockRawIO()
1334 bufio = self.tp(writer, 8)
1335 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1336 self.assertRaises(TypeError, bufio.writelines, None)
1337 self.assertRaises(TypeError, bufio.writelines, 'abc')
1338
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001339 def test_destructor(self):
1340 writer = self.MockRawIO()
1341 bufio = self.tp(writer, 8)
1342 bufio.write(b"abc")
1343 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001344 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001345 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346
1347 def test_truncate(self):
1348 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001349 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001350 bufio = self.tp(raw, 8)
1351 bufio.write(b"abcdef")
1352 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001353 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001354 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001355 self.assertEqual(f.read(), b"abc")
1356
Victor Stinner45df8202010-04-28 22:31:17 +00001357 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001358 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001359 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001360 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 # Write out many bytes from many threads and test they were
1362 # all flushed.
1363 N = 1000
1364 contents = bytes(range(256)) * N
1365 sizes = cycle([1, 19])
1366 n = 0
1367 queue = deque()
1368 while n < len(contents):
1369 size = next(sizes)
1370 queue.append(contents[n:n+size])
1371 n += size
1372 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001373 # We use a real file object because it allows us to
1374 # exercise situations where the GIL is released before
1375 # writing the buffer to the raw streams. This is in addition
1376 # to concurrency issues due to switching threads in the middle
1377 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001378 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001379 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001380 errors = []
1381 def f():
1382 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 while True:
1384 try:
1385 s = queue.popleft()
1386 except IndexError:
1387 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001388 bufio.write(s)
1389 except Exception as e:
1390 errors.append(e)
1391 raise
1392 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001393 with support.start_threads(threads):
1394 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001395 self.assertFalse(errors,
1396 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001398 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 s = f.read()
1400 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001402 finally:
1403 support.unlink(support.TESTFN)
1404
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 def test_misbehaved_io(self):
1406 rawio = self.MisbehavedRawIO()
1407 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001408 self.assertRaises(OSError, bufio.seek, 0)
1409 self.assertRaises(OSError, bufio.tell)
1410 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411
Florent Xicluna109d5732012-07-07 17:03:22 +02001412 def test_max_buffer_size_removal(self):
1413 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001414 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001415
Benjamin Peterson68623612012-12-20 11:53:11 -06001416 def test_write_error_on_close(self):
1417 raw = self.MockRawIO()
1418 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001419 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001420 raw.write = bad_write
1421 b = self.tp(raw)
1422 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001423 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001424 self.assertTrue(b.closed)
1425
Benjamin Peterson59406a92009-03-26 17:10:29 +00001426
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001427class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 tp = io.BufferedWriter
1429
1430 def test_constructor(self):
1431 BufferedWriterTest.test_constructor(self)
1432 # The allocation can succeed on 32-bit builds, e.g. with more
1433 # than 2GB RAM and a 64-bit kernel.
1434 if sys.maxsize > 0x7FFFFFFF:
1435 rawio = self.MockRawIO()
1436 bufio = self.tp(rawio)
1437 self.assertRaises((OverflowError, MemoryError, ValueError),
1438 bufio.__init__, rawio, sys.maxsize)
1439
1440 def test_initialization(self):
1441 rawio = self.MockRawIO()
1442 bufio = self.tp(rawio)
1443 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1444 self.assertRaises(ValueError, bufio.write, b"def")
1445 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1446 self.assertRaises(ValueError, bufio.write, b"def")
1447 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1448 self.assertRaises(ValueError, bufio.write, b"def")
1449
1450 def test_garbage_collection(self):
1451 # C BufferedWriter objects are collected, and collecting them flushes
1452 # all data to disk.
1453 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001454 with support.check_warnings(('', ResourceWarning)):
1455 rawio = self.FileIO(support.TESTFN, "w+b")
1456 f = self.tp(rawio)
1457 f.write(b"123xxx")
1458 f.x = f
1459 wr = weakref.ref(f)
1460 del f
1461 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001462 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001463 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001464 self.assertEqual(f.read(), b"123xxx")
1465
R David Murray67bfe802013-02-23 21:51:05 -05001466 def test_args_error(self):
1467 # Issue #17275
1468 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1469 self.tp(io.BytesIO(), 1024, 1024, 1024)
1470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471
1472class PyBufferedWriterTest(BufferedWriterTest):
1473 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001474
Guido van Rossum01a27522007-03-07 01:00:12 +00001475class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001476
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001477 def test_constructor(self):
1478 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001479 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001480
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001481 def test_uninitialized(self):
1482 pair = self.tp.__new__(self.tp)
1483 del pair
1484 pair = self.tp.__new__(self.tp)
1485 self.assertRaisesRegex((ValueError, AttributeError),
1486 'uninitialized|has no attribute',
1487 pair.read, 0)
1488 self.assertRaisesRegex((ValueError, AttributeError),
1489 'uninitialized|has no attribute',
1490 pair.write, b'')
1491 pair.__init__(self.MockRawIO(), self.MockRawIO())
1492 self.assertEqual(pair.read(0), b'')
1493 self.assertEqual(pair.write(b''), 0)
1494
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001495 def test_detach(self):
1496 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1497 self.assertRaises(self.UnsupportedOperation, pair.detach)
1498
Florent Xicluna109d5732012-07-07 17:03:22 +02001499 def test_constructor_max_buffer_size_removal(self):
1500 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001501 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001502
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001503 def test_constructor_with_not_readable(self):
1504 class NotReadable(MockRawIO):
1505 def readable(self):
1506 return False
1507
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001508 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001509
1510 def test_constructor_with_not_writeable(self):
1511 class NotWriteable(MockRawIO):
1512 def writable(self):
1513 return False
1514
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001515 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001516
1517 def test_read(self):
1518 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1519
1520 self.assertEqual(pair.read(3), b"abc")
1521 self.assertEqual(pair.read(1), b"d")
1522 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001523 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1524 self.assertEqual(pair.read(None), b"abc")
1525
1526 def test_readlines(self):
1527 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1528 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1529 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1530 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001531
1532 def test_read1(self):
1533 # .read1() is delegated to the underlying reader object, so this test
1534 # can be shallow.
1535 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1536
1537 self.assertEqual(pair.read1(3), b"abc")
1538
1539 def test_readinto(self):
1540 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1541
1542 data = bytearray(5)
1543 self.assertEqual(pair.readinto(data), 5)
1544 self.assertEqual(data, b"abcde")
1545
1546 def test_write(self):
1547 w = self.MockRawIO()
1548 pair = self.tp(self.MockRawIO(), w)
1549
1550 pair.write(b"abc")
1551 pair.flush()
1552 pair.write(b"def")
1553 pair.flush()
1554 self.assertEqual(w._write_stack, [b"abc", b"def"])
1555
1556 def test_peek(self):
1557 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1558
1559 self.assertTrue(pair.peek(3).startswith(b"abc"))
1560 self.assertEqual(pair.read(3), b"abc")
1561
1562 def test_readable(self):
1563 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1564 self.assertTrue(pair.readable())
1565
1566 def test_writeable(self):
1567 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1568 self.assertTrue(pair.writable())
1569
1570 def test_seekable(self):
1571 # BufferedRWPairs are never seekable, even if their readers and writers
1572 # are.
1573 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1574 self.assertFalse(pair.seekable())
1575
1576 # .flush() is delegated to the underlying writer object and has been
1577 # tested in the test_write method.
1578
1579 def test_close_and_closed(self):
1580 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1581 self.assertFalse(pair.closed)
1582 pair.close()
1583 self.assertTrue(pair.closed)
1584
Serhiy Storchaka7665be62015-03-24 23:21:57 +02001585 def test_reader_close_error_on_close(self):
1586 def reader_close():
1587 reader_non_existing
1588 reader = self.MockRawIO()
1589 reader.close = reader_close
1590 writer = self.MockRawIO()
1591 pair = self.tp(reader, writer)
1592 with self.assertRaises(NameError) as err:
1593 pair.close()
1594 self.assertIn('reader_non_existing', str(err.exception))
1595 self.assertTrue(pair.closed)
1596 self.assertFalse(reader.closed)
1597 self.assertTrue(writer.closed)
1598
1599 def test_writer_close_error_on_close(self):
1600 def writer_close():
1601 writer_non_existing
1602 reader = self.MockRawIO()
1603 writer = self.MockRawIO()
1604 writer.close = writer_close
1605 pair = self.tp(reader, writer)
1606 with self.assertRaises(NameError) as err:
1607 pair.close()
1608 self.assertIn('writer_non_existing', str(err.exception))
1609 self.assertFalse(pair.closed)
1610 self.assertTrue(reader.closed)
1611 self.assertFalse(writer.closed)
1612
1613 def test_reader_writer_close_error_on_close(self):
1614 def reader_close():
1615 reader_non_existing
1616 def writer_close():
1617 writer_non_existing
1618 reader = self.MockRawIO()
1619 reader.close = reader_close
1620 writer = self.MockRawIO()
1621 writer.close = writer_close
1622 pair = self.tp(reader, writer)
1623 with self.assertRaises(NameError) as err:
1624 pair.close()
1625 self.assertIn('reader_non_existing', str(err.exception))
1626 self.assertIsInstance(err.exception.__context__, NameError)
1627 self.assertIn('writer_non_existing', str(err.exception.__context__))
1628 self.assertFalse(pair.closed)
1629 self.assertFalse(reader.closed)
1630 self.assertFalse(writer.closed)
1631
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001632 def test_isatty(self):
1633 class SelectableIsAtty(MockRawIO):
1634 def __init__(self, isatty):
1635 MockRawIO.__init__(self)
1636 self._isatty = isatty
1637
1638 def isatty(self):
1639 return self._isatty
1640
1641 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1642 self.assertFalse(pair.isatty())
1643
1644 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1645 self.assertTrue(pair.isatty())
1646
1647 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1648 self.assertTrue(pair.isatty())
1649
1650 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1651 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001652
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001653 def test_weakref_clearing(self):
1654 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1655 ref = weakref.ref(brw)
1656 brw = None
1657 ref = None # Shouldn't segfault.
1658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659class CBufferedRWPairTest(BufferedRWPairTest):
1660 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662class PyBufferedRWPairTest(BufferedRWPairTest):
1663 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665
1666class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1667 read_mode = "rb+"
1668 write_mode = "wb+"
1669
1670 def test_constructor(self):
1671 BufferedReaderTest.test_constructor(self)
1672 BufferedWriterTest.test_constructor(self)
1673
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001674 def test_uninitialized(self):
1675 BufferedReaderTest.test_uninitialized(self)
1676 BufferedWriterTest.test_uninitialized(self)
1677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_read_and_write(self):
1679 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001680 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001681
1682 self.assertEqual(b"as", rw.read(2))
1683 rw.write(b"ddd")
1684 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001685 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 def test_seek_and_tell(self):
1690 raw = self.BytesIO(b"asdfghjkl")
1691 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001692
Ezio Melottib3aedd42010-11-20 19:04:17 +00001693 self.assertEqual(b"as", rw.read(2))
1694 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001695 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001696 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001697
Antoine Pitroue05565e2011-08-20 14:39:23 +02001698 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001699 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001700 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001702 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001703 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001704 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(7, rw.tell())
1706 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001707 rw.flush()
1708 self.assertEqual(b"asdf123fl", raw.getvalue())
1709
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001710 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def check_flush_and_read(self, read_func):
1713 raw = self.BytesIO(b"abcdefghi")
1714 bufio = self.tp(raw)
1715
Ezio Melottib3aedd42010-11-20 19:04:17 +00001716 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001718 self.assertEqual(b"ef", read_func(bufio, 2))
1719 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(6, bufio.tell())
1722 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 raw.seek(0, 0)
1724 raw.write(b"XYZ")
1725 # flush() resets the read buffer
1726 bufio.flush()
1727 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001729
1730 def test_flush_and_read(self):
1731 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1732
1733 def test_flush_and_readinto(self):
1734 def _readinto(bufio, n=-1):
1735 b = bytearray(n if n >= 0 else 9999)
1736 n = bufio.readinto(b)
1737 return bytes(b[:n])
1738 self.check_flush_and_read(_readinto)
1739
1740 def test_flush_and_peek(self):
1741 def _peek(bufio, n=-1):
1742 # This relies on the fact that the buffer can contain the whole
1743 # raw stream, otherwise peek() can return less.
1744 b = bufio.peek(n)
1745 if n != -1:
1746 b = b[:n]
1747 bufio.seek(len(b), 1)
1748 return b
1749 self.check_flush_and_read(_peek)
1750
1751 def test_flush_and_write(self):
1752 raw = self.BytesIO(b"abcdefghi")
1753 bufio = self.tp(raw)
1754
1755 bufio.write(b"123")
1756 bufio.flush()
1757 bufio.write(b"45")
1758 bufio.flush()
1759 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001760 self.assertEqual(b"12345fghi", raw.getvalue())
1761 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762
1763 def test_threads(self):
1764 BufferedReaderTest.test_threads(self)
1765 BufferedWriterTest.test_threads(self)
1766
1767 def test_writes_and_peek(self):
1768 def _peek(bufio):
1769 bufio.peek(1)
1770 self.check_writes(_peek)
1771 def _peek(bufio):
1772 pos = bufio.tell()
1773 bufio.seek(-1, 1)
1774 bufio.peek(1)
1775 bufio.seek(pos, 0)
1776 self.check_writes(_peek)
1777
1778 def test_writes_and_reads(self):
1779 def _read(bufio):
1780 bufio.seek(-1, 1)
1781 bufio.read(1)
1782 self.check_writes(_read)
1783
1784 def test_writes_and_read1s(self):
1785 def _read1(bufio):
1786 bufio.seek(-1, 1)
1787 bufio.read1(1)
1788 self.check_writes(_read1)
1789
1790 def test_writes_and_readintos(self):
1791 def _read(bufio):
1792 bufio.seek(-1, 1)
1793 bufio.readinto(bytearray(1))
1794 self.check_writes(_read)
1795
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001796 def test_write_after_readahead(self):
1797 # Issue #6629: writing after the buffer was filled by readahead should
1798 # first rewind the raw stream.
1799 for overwrite_size in [1, 5]:
1800 raw = self.BytesIO(b"A" * 10)
1801 bufio = self.tp(raw, 4)
1802 # Trigger readahead
1803 self.assertEqual(bufio.read(1), b"A")
1804 self.assertEqual(bufio.tell(), 1)
1805 # Overwriting should rewind the raw stream if it needs so
1806 bufio.write(b"B" * overwrite_size)
1807 self.assertEqual(bufio.tell(), overwrite_size + 1)
1808 # If the write size was smaller than the buffer size, flush() and
1809 # check that rewind happens.
1810 bufio.flush()
1811 self.assertEqual(bufio.tell(), overwrite_size + 1)
1812 s = raw.getvalue()
1813 self.assertEqual(s,
1814 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1815
Antoine Pitrou7c404892011-05-13 00:13:33 +02001816 def test_write_rewind_write(self):
1817 # Various combinations of reading / writing / seeking backwards / writing again
1818 def mutate(bufio, pos1, pos2):
1819 assert pos2 >= pos1
1820 # Fill the buffer
1821 bufio.seek(pos1)
1822 bufio.read(pos2 - pos1)
1823 bufio.write(b'\x02')
1824 # This writes earlier than the previous write, but still inside
1825 # the buffer.
1826 bufio.seek(pos1)
1827 bufio.write(b'\x01')
1828
1829 b = b"\x80\x81\x82\x83\x84"
1830 for i in range(0, len(b)):
1831 for j in range(i, len(b)):
1832 raw = self.BytesIO(b)
1833 bufio = self.tp(raw, 100)
1834 mutate(bufio, i, j)
1835 bufio.flush()
1836 expected = bytearray(b)
1837 expected[j] = 2
1838 expected[i] = 1
1839 self.assertEqual(raw.getvalue(), expected,
1840 "failed result for i=%d, j=%d" % (i, j))
1841
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001842 def test_truncate_after_read_or_write(self):
1843 raw = self.BytesIO(b"A" * 10)
1844 bufio = self.tp(raw, 100)
1845 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1846 self.assertEqual(bufio.truncate(), 2)
1847 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1848 self.assertEqual(bufio.truncate(), 4)
1849
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850 def test_misbehaved_io(self):
1851 BufferedReaderTest.test_misbehaved_io(self)
1852 BufferedWriterTest.test_misbehaved_io(self)
1853
Antoine Pitroue05565e2011-08-20 14:39:23 +02001854 def test_interleaved_read_write(self):
1855 # Test for issue #12213
1856 with self.BytesIO(b'abcdefgh') as raw:
1857 with self.tp(raw, 100) as f:
1858 f.write(b"1")
1859 self.assertEqual(f.read(1), b'b')
1860 f.write(b'2')
1861 self.assertEqual(f.read1(1), b'd')
1862 f.write(b'3')
1863 buf = bytearray(1)
1864 f.readinto(buf)
1865 self.assertEqual(buf, b'f')
1866 f.write(b'4')
1867 self.assertEqual(f.peek(1), b'h')
1868 f.flush()
1869 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1870
1871 with self.BytesIO(b'abc') as raw:
1872 with self.tp(raw, 100) as f:
1873 self.assertEqual(f.read(1), b'a')
1874 f.write(b"2")
1875 self.assertEqual(f.read(1), b'c')
1876 f.flush()
1877 self.assertEqual(raw.getvalue(), b'a2c')
1878
1879 def test_interleaved_readline_write(self):
1880 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1881 with self.tp(raw) as f:
1882 f.write(b'1')
1883 self.assertEqual(f.readline(), b'b\n')
1884 f.write(b'2')
1885 self.assertEqual(f.readline(), b'def\n')
1886 f.write(b'3')
1887 self.assertEqual(f.readline(), b'\n')
1888 f.flush()
1889 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1890
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001891 # You can't construct a BufferedRandom over a non-seekable stream.
1892 test_unseekable = None
1893
R David Murray67bfe802013-02-23 21:51:05 -05001894
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001895class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 tp = io.BufferedRandom
1897
1898 def test_constructor(self):
1899 BufferedRandomTest.test_constructor(self)
1900 # The allocation can succeed on 32-bit builds, e.g. with more
1901 # than 2GB RAM and a 64-bit kernel.
1902 if sys.maxsize > 0x7FFFFFFF:
1903 rawio = self.MockRawIO()
1904 bufio = self.tp(rawio)
1905 self.assertRaises((OverflowError, MemoryError, ValueError),
1906 bufio.__init__, rawio, sys.maxsize)
1907
1908 def test_garbage_collection(self):
1909 CBufferedReaderTest.test_garbage_collection(self)
1910 CBufferedWriterTest.test_garbage_collection(self)
1911
R David Murray67bfe802013-02-23 21:51:05 -05001912 def test_args_error(self):
1913 # Issue #17275
1914 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1915 self.tp(io.BytesIO(), 1024, 1024, 1024)
1916
1917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918class PyBufferedRandomTest(BufferedRandomTest):
1919 tp = pyio.BufferedRandom
1920
1921
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001922# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1923# properties:
1924# - A single output character can correspond to many bytes of input.
1925# - The number of input bytes to complete the character can be
1926# undetermined until the last input byte is received.
1927# - The number of input bytes can vary depending on previous input.
1928# - A single input byte can correspond to many characters of output.
1929# - The number of output characters can be undetermined until the
1930# last input byte is received.
1931# - The number of output characters can vary depending on previous input.
1932
1933class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1934 """
1935 For testing seek/tell behavior with a stateful, buffering decoder.
1936
1937 Input is a sequence of words. Words may be fixed-length (length set
1938 by input) or variable-length (period-terminated). In variable-length
1939 mode, extra periods are ignored. Possible words are:
1940 - 'i' followed by a number sets the input length, I (maximum 99).
1941 When I is set to 0, words are space-terminated.
1942 - 'o' followed by a number sets the output length, O (maximum 99).
1943 - Any other word is converted into a word followed by a period on
1944 the output. The output word consists of the input word truncated
1945 or padded out with hyphens to make its length equal to O. If O
1946 is 0, the word is output verbatim without truncating or padding.
1947 I and O are initially set to 1. When I changes, any buffered input is
1948 re-scanned according to the new I. EOF also terminates the last word.
1949 """
1950
1951 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001952 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001953 self.reset()
1954
1955 def __repr__(self):
1956 return '<SID %x>' % id(self)
1957
1958 def reset(self):
1959 self.i = 1
1960 self.o = 1
1961 self.buffer = bytearray()
1962
1963 def getstate(self):
1964 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1965 return bytes(self.buffer), i*100 + o
1966
1967 def setstate(self, state):
1968 buffer, io = state
1969 self.buffer = bytearray(buffer)
1970 i, o = divmod(io, 100)
1971 self.i, self.o = i ^ 1, o ^ 1
1972
1973 def decode(self, input, final=False):
1974 output = ''
1975 for b in input:
1976 if self.i == 0: # variable-length, terminated with period
1977 if b == ord('.'):
1978 if self.buffer:
1979 output += self.process_word()
1980 else:
1981 self.buffer.append(b)
1982 else: # fixed-length, terminate after self.i bytes
1983 self.buffer.append(b)
1984 if len(self.buffer) == self.i:
1985 output += self.process_word()
1986 if final and self.buffer: # EOF terminates the last word
1987 output += self.process_word()
1988 return output
1989
1990 def process_word(self):
1991 output = ''
1992 if self.buffer[0] == ord('i'):
1993 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1994 elif self.buffer[0] == ord('o'):
1995 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1996 else:
1997 output = self.buffer.decode('ascii')
1998 if len(output) < self.o:
1999 output += '-'*self.o # pad out with hyphens
2000 if self.o:
2001 output = output[:self.o] # truncate to output length
2002 output += '.'
2003 self.buffer = bytearray()
2004 return output
2005
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002006 codecEnabled = False
2007
2008 @classmethod
2009 def lookupTestDecoder(cls, name):
2010 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002011 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002012 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002013 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002014 incrementalencoder=None,
2015 streamreader=None, streamwriter=None,
2016 incrementaldecoder=cls)
2017
2018# Register the previous decoder for testing.
2019# Disabled by default, tests will enable it.
2020codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2021
2022
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002023class StatefulIncrementalDecoderTest(unittest.TestCase):
2024 """
2025 Make sure the StatefulIncrementalDecoder actually works.
2026 """
2027
2028 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002029 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002030 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002031 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002032 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002033 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002034 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002035 # I=0, O=6 (variable-length input, fixed-length output)
2036 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2037 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002038 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002039 # I=6, O=3 (fixed-length input > fixed-length output)
2040 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2041 # I=0, then 3; O=29, then 15 (with longer output)
2042 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2043 'a----------------------------.' +
2044 'b----------------------------.' +
2045 'cde--------------------------.' +
2046 'abcdefghijabcde.' +
2047 'a.b------------.' +
2048 '.c.------------.' +
2049 'd.e------------.' +
2050 'k--------------.' +
2051 'l--------------.' +
2052 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002053 ]
2054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002056 # Try a few one-shot test cases.
2057 for input, eof, output in self.test_cases:
2058 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002059 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002060
2061 # Also test an unfinished decode, followed by forcing EOF.
2062 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002063 self.assertEqual(d.decode(b'oiabcd'), '')
2064 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002065
2066class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002067
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002068 def setUp(self):
2069 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2070 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002071 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002072
Guido van Rossumd0712812007-04-11 16:32:43 +00002073 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002074 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 def test_constructor(self):
2077 r = self.BytesIO(b"\xc3\xa9\n\n")
2078 b = self.BufferedReader(r, 1000)
2079 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002080 t.__init__(b, encoding="latin-1", newline="\r\n")
2081 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002082 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002083 t.__init__(b, encoding="utf-8", line_buffering=True)
2084 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002085 self.assertEqual(t.line_buffering, True)
2086 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 self.assertRaises(TypeError, t.__init__, b, newline=42)
2088 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2089
Nick Coghlana9b15242014-02-04 22:11:18 +10002090 def test_non_text_encoding_codecs_are_rejected(self):
2091 # Ensure the constructor complains if passed a codec that isn't
2092 # marked as a text encoding
2093 # http://bugs.python.org/issue20404
2094 r = self.BytesIO()
2095 b = self.BufferedWriter(r)
2096 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2097 self.TextIOWrapper(b, encoding="hex")
2098
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002099 def test_detach(self):
2100 r = self.BytesIO()
2101 b = self.BufferedWriter(r)
2102 t = self.TextIOWrapper(b)
2103 self.assertIs(t.detach(), b)
2104
2105 t = self.TextIOWrapper(b, encoding="ascii")
2106 t.write("howdy")
2107 self.assertFalse(r.getvalue())
2108 t.detach()
2109 self.assertEqual(r.getvalue(), b"howdy")
2110 self.assertRaises(ValueError, t.detach)
2111
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002112 # Operations independent of the detached stream should still work
2113 repr(t)
2114 self.assertEqual(t.encoding, "ascii")
2115 self.assertEqual(t.errors, "strict")
2116 self.assertFalse(t.line_buffering)
2117
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002118 def test_repr(self):
2119 raw = self.BytesIO("hello".encode("utf-8"))
2120 b = self.BufferedReader(raw)
2121 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002122 modname = self.TextIOWrapper.__module__
2123 self.assertEqual(repr(t),
2124 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2125 raw.name = "dummy"
2126 self.assertEqual(repr(t),
2127 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002128 t.mode = "r"
2129 self.assertEqual(repr(t),
2130 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002131 raw.name = b"dummy"
2132 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002133 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002134
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002135 t.buffer.detach()
2136 repr(t) # Should not raise an exception
2137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138 def test_line_buffering(self):
2139 r = self.BytesIO()
2140 b = self.BufferedWriter(r, 1000)
2141 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002142 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002143 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002144 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002145 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002146 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002147 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002148
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002149 def test_default_encoding(self):
2150 old_environ = dict(os.environ)
2151 try:
2152 # try to get a user preferred encoding different than the current
2153 # locale encoding to check that TextIOWrapper() uses the current
2154 # locale encoding and not the user preferred encoding
2155 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2156 if key in os.environ:
2157 del os.environ[key]
2158
2159 current_locale_encoding = locale.getpreferredencoding(False)
2160 b = self.BytesIO()
2161 t = self.TextIOWrapper(b)
2162 self.assertEqual(t.encoding, current_locale_encoding)
2163 finally:
2164 os.environ.clear()
2165 os.environ.update(old_environ)
2166
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002167 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002168 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002169 # Issue 15989
2170 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002171 b = self.BytesIO()
2172 b.fileno = lambda: _testcapi.INT_MAX + 1
2173 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2174 b.fileno = lambda: _testcapi.UINT_MAX + 1
2175 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 def test_encoding(self):
2178 # Check the encoding attribute is always set, and valid
2179 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002180 t = self.TextIOWrapper(b, encoding="utf-8")
2181 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002183 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 codecs.lookup(t.encoding)
2185
2186 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002187 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 b = self.BytesIO(b"abc\n\xff\n")
2189 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002190 self.assertRaises(UnicodeError, t.read)
2191 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002192 b = self.BytesIO(b"abc\n\xff\n")
2193 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002194 self.assertRaises(UnicodeError, t.read)
2195 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 b = self.BytesIO(b"abc\n\xff\n")
2197 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002199 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002200 b = self.BytesIO(b"abc\n\xff\n")
2201 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002204 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002205 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206 b = self.BytesIO()
2207 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002208 self.assertRaises(UnicodeError, t.write, "\xff")
2209 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210 b = self.BytesIO()
2211 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002212 self.assertRaises(UnicodeError, t.write, "\xff")
2213 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002214 b = self.BytesIO()
2215 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002216 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002217 t.write("abc\xffdef\n")
2218 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002219 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002220 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002221 b = self.BytesIO()
2222 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002223 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002224 t.write("abc\xffdef\n")
2225 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002229 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2230
2231 tests = [
2232 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002233 [ '', input_lines ],
2234 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2235 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2236 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002237 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002238 encodings = (
2239 'utf-8', 'latin-1',
2240 'utf-16', 'utf-16-le', 'utf-16-be',
2241 'utf-32', 'utf-32-le', 'utf-32-be',
2242 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002243
Guido van Rossum8358db22007-08-18 21:39:55 +00002244 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002245 # character in TextIOWrapper._pending_line.
2246 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002247 # XXX: str.encode() should return bytes
2248 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002249 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002250 for bufsize in range(1, 10):
2251 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002252 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2253 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002254 encoding=encoding)
2255 if do_reads:
2256 got_lines = []
2257 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002258 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002259 if c2 == '':
2260 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002261 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002262 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002263 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002264 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002265
2266 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002267 self.assertEqual(got_line, exp_line)
2268 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270 def test_newlines_input(self):
2271 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002272 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2273 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002274 (None, normalized.decode("ascii").splitlines(keepends=True)),
2275 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002276 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2277 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2278 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002279 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 buf = self.BytesIO(testdata)
2281 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002282 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002283 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002284 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002285
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 def test_newlines_output(self):
2287 testdict = {
2288 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2289 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2290 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2291 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2292 }
2293 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2294 for newline, expected in tests:
2295 buf = self.BytesIO()
2296 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2297 txt.write("AAA\nB")
2298 txt.write("BB\nCCC\n")
2299 txt.write("X\rY\r\nZ")
2300 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(buf.closed, False)
2302 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002303
2304 def test_destructor(self):
2305 l = []
2306 base = self.BytesIO
2307 class MyBytesIO(base):
2308 def close(self):
2309 l.append(self.getvalue())
2310 base.close(self)
2311 b = MyBytesIO()
2312 t = self.TextIOWrapper(b, encoding="ascii")
2313 t.write("abc")
2314 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002315 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002316 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002317
2318 def test_override_destructor(self):
2319 record = []
2320 class MyTextIO(self.TextIOWrapper):
2321 def __del__(self):
2322 record.append(1)
2323 try:
2324 f = super().__del__
2325 except AttributeError:
2326 pass
2327 else:
2328 f()
2329 def close(self):
2330 record.append(2)
2331 super().close()
2332 def flush(self):
2333 record.append(3)
2334 super().flush()
2335 b = self.BytesIO()
2336 t = MyTextIO(b, encoding="ascii")
2337 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002338 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002339 self.assertEqual(record, [1, 2, 3])
2340
2341 def test_error_through_destructor(self):
2342 # Test that the exception state is not modified by a destructor,
2343 # even if close() fails.
2344 rawio = self.CloseFailureIO()
2345 def f():
2346 self.TextIOWrapper(rawio).xyzzy
2347 with support.captured_output("stderr") as s:
2348 self.assertRaises(AttributeError, f)
2349 s = s.getvalue().strip()
2350 if s:
2351 # The destructor *may* have printed an unraisable error, check it
2352 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002353 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002354 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002355
Guido van Rossum9b76da62007-04-11 01:09:03 +00002356 # Systematic tests of the text I/O API
2357
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002359 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002360 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002362 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002363 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002364 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002366 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002367 self.assertEqual(f.tell(), 0)
2368 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002369 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002370 self.assertEqual(f.seek(0), 0)
2371 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002372 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002373 self.assertEqual(f.read(2), "ab")
2374 self.assertEqual(f.read(1), "c")
2375 self.assertEqual(f.read(1), "")
2376 self.assertEqual(f.read(), "")
2377 self.assertEqual(f.tell(), cookie)
2378 self.assertEqual(f.seek(0), 0)
2379 self.assertEqual(f.seek(0, 2), cookie)
2380 self.assertEqual(f.write("def"), 3)
2381 self.assertEqual(f.seek(cookie), cookie)
2382 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002383 if enc.startswith("utf"):
2384 self.multi_line_test(f, enc)
2385 f.close()
2386
2387 def multi_line_test(self, f, enc):
2388 f.seek(0)
2389 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002390 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002391 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002392 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002393 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002394 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002395 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002396 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002397 wlines.append((f.tell(), line))
2398 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002399 f.seek(0)
2400 rlines = []
2401 while True:
2402 pos = f.tell()
2403 line = f.readline()
2404 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002405 break
2406 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002407 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002408
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002410 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002411 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002412 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002413 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002414 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002415 p2 = f.tell()
2416 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002417 self.assertEqual(f.tell(), p0)
2418 self.assertEqual(f.readline(), "\xff\n")
2419 self.assertEqual(f.tell(), p1)
2420 self.assertEqual(f.readline(), "\xff\n")
2421 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002422 f.seek(0)
2423 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002424 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002425 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002426 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002427 f.close()
2428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002429 def test_seeking(self):
2430 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002431 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002432 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002433 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002434 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002435 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002436 suffix = bytes(u_suffix.encode("utf-8"))
2437 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002438 with self.open(support.TESTFN, "wb") as f:
2439 f.write(line*2)
2440 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2441 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002442 self.assertEqual(s, str(prefix, "ascii"))
2443 self.assertEqual(f.tell(), prefix_size)
2444 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002445
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002446 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002447 # Regression test for a specific bug
2448 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002449 with self.open(support.TESTFN, "wb") as f:
2450 f.write(data)
2451 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2452 f._CHUNK_SIZE # Just test that it exists
2453 f._CHUNK_SIZE = 2
2454 f.readline()
2455 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002457 def test_seek_and_tell(self):
2458 #Test seek/tell using the StatefulIncrementalDecoder.
2459 # Make test faster by doing smaller seeks
2460 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002461
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002462 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002463 """Tell/seek to various points within a data stream and ensure
2464 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002465 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002466 f.write(data)
2467 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002468 f = self.open(support.TESTFN, encoding='test_decoder')
2469 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002470 decoded = f.read()
2471 f.close()
2472
Neal Norwitze2b07052008-03-18 19:52:05 +00002473 for i in range(min_pos, len(decoded) + 1): # seek positions
2474 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002475 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002476 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002477 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002478 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002479 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002480 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002481 f.close()
2482
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002483 # Enable the test decoder.
2484 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002485
2486 # Run the tests.
2487 try:
2488 # Try each test case.
2489 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002490 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002491
2492 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002493 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2494 offset = CHUNK_SIZE - len(input)//2
2495 prefix = b'.'*offset
2496 # Don't bother seeking into the prefix (takes too long).
2497 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002498 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002499
2500 # Ensure our test decoder won't interfere with subsequent tests.
2501 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002502 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002505 data = "1234567890"
2506 tests = ("utf-16",
2507 "utf-16-le",
2508 "utf-16-be",
2509 "utf-32",
2510 "utf-32-le",
2511 "utf-32-be")
2512 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 buf = self.BytesIO()
2514 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002515 # Check if the BOM is written only once (see issue1753).
2516 f.write(data)
2517 f.write(data)
2518 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002519 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002520 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002521 self.assertEqual(f.read(), data * 2)
2522 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002523
Benjamin Petersona1b49012009-03-31 23:11:32 +00002524 def test_unreadable(self):
2525 class UnReadable(self.BytesIO):
2526 def readable(self):
2527 return False
2528 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002529 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002531 def test_read_one_by_one(self):
2532 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002533 reads = ""
2534 while True:
2535 c = txt.read(1)
2536 if not c:
2537 break
2538 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002540
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002541 def test_readlines(self):
2542 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2543 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2544 txt.seek(0)
2545 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2546 txt.seek(0)
2547 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2548
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002549 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002551 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002553 reads = ""
2554 while True:
2555 c = txt.read(128)
2556 if not c:
2557 break
2558 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002559 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002560
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002561 def test_writelines(self):
2562 l = ['ab', 'cd', 'ef']
2563 buf = self.BytesIO()
2564 txt = self.TextIOWrapper(buf)
2565 txt.writelines(l)
2566 txt.flush()
2567 self.assertEqual(buf.getvalue(), b'abcdef')
2568
2569 def test_writelines_userlist(self):
2570 l = UserList(['ab', 'cd', 'ef'])
2571 buf = self.BytesIO()
2572 txt = self.TextIOWrapper(buf)
2573 txt.writelines(l)
2574 txt.flush()
2575 self.assertEqual(buf.getvalue(), b'abcdef')
2576
2577 def test_writelines_error(self):
2578 txt = self.TextIOWrapper(self.BytesIO())
2579 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2580 self.assertRaises(TypeError, txt.writelines, None)
2581 self.assertRaises(TypeError, txt.writelines, b'abc')
2582
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002583 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002584 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002585
2586 # read one char at a time
2587 reads = ""
2588 while True:
2589 c = txt.read(1)
2590 if not c:
2591 break
2592 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002593 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594
2595 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002596 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002597 txt._CHUNK_SIZE = 4
2598
2599 reads = ""
2600 while True:
2601 c = txt.read(4)
2602 if not c:
2603 break
2604 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002605 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002606
2607 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002608 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002609 txt._CHUNK_SIZE = 4
2610
2611 reads = txt.read(4)
2612 reads += txt.read(4)
2613 reads += txt.readline()
2614 reads += txt.readline()
2615 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002616 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002617
2618 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002620 txt._CHUNK_SIZE = 4
2621
2622 reads = txt.read(4)
2623 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002625
2626 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002627 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002628 txt._CHUNK_SIZE = 4
2629
2630 reads = txt.read(4)
2631 pos = txt.tell()
2632 txt.seek(0)
2633 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002634 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002635
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002636 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 buffer = self.BytesIO(self.testdata)
2638 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002639
2640 self.assertEqual(buffer.seekable(), txt.seekable())
2641
Antoine Pitroue4501852009-05-14 18:55:55 +00002642 def test_append_bom(self):
2643 # The BOM is not written again when appending to a non-empty file
2644 filename = support.TESTFN
2645 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2646 with self.open(filename, 'w', encoding=charset) as f:
2647 f.write('aaa')
2648 pos = f.tell()
2649 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002651
2652 with self.open(filename, 'a', encoding=charset) as f:
2653 f.write('xxx')
2654 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002655 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002656
2657 def test_seek_bom(self):
2658 # Same test, but when seeking manually
2659 filename = support.TESTFN
2660 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2661 with self.open(filename, 'w', encoding=charset) as f:
2662 f.write('aaa')
2663 pos = f.tell()
2664 with self.open(filename, 'r+', encoding=charset) as f:
2665 f.seek(pos)
2666 f.write('zzz')
2667 f.seek(0)
2668 f.write('bbb')
2669 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002670 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002671
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02002672 def test_seek_append_bom(self):
2673 # Same test, but first seek to the start and then to the end
2674 filename = support.TESTFN
2675 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2676 with self.open(filename, 'w', encoding=charset) as f:
2677 f.write('aaa')
2678 with self.open(filename, 'a', encoding=charset) as f:
2679 f.seek(0)
2680 f.seek(0, self.SEEK_END)
2681 f.write('xxx')
2682 with self.open(filename, 'rb') as f:
2683 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2684
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002685 def test_errors_property(self):
2686 with self.open(support.TESTFN, "w") as f:
2687 self.assertEqual(f.errors, "strict")
2688 with self.open(support.TESTFN, "w", errors="replace") as f:
2689 self.assertEqual(f.errors, "replace")
2690
Brett Cannon31f59292011-02-21 19:29:56 +00002691 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002692 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002693 def test_threads_write(self):
2694 # Issue6750: concurrent writes could duplicate data
2695 event = threading.Event()
2696 with self.open(support.TESTFN, "w", buffering=1) as f:
2697 def run(n):
2698 text = "Thread%03d\n" % n
2699 event.wait()
2700 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002701 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002702 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03002703 with support.start_threads(threads, event.set):
2704 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002705 with self.open(support.TESTFN) as f:
2706 content = f.read()
2707 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002708 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002709
Antoine Pitrou6be88762010-05-03 16:48:20 +00002710 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002711 # Test that text file is closed despite failed flush
2712 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00002713 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002714 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00002715 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002716 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002717 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002718 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002719 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002720 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02002721 self.assertTrue(txt.buffer.closed)
2722 self.assertTrue(closed) # flush() called
2723 self.assertFalse(closed[0]) # flush() called before file closed
2724 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02002725 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00002726
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002727 def test_close_error_on_close(self):
2728 buffer = self.BytesIO(self.testdata)
2729 def bad_flush():
2730 raise OSError('flush')
2731 def bad_close():
2732 raise OSError('close')
2733 buffer.close = bad_close
2734 txt = self.TextIOWrapper(buffer, encoding="ascii")
2735 txt.flush = bad_flush
2736 with self.assertRaises(OSError) as err: # exception not swallowed
2737 txt.close()
2738 self.assertEqual(err.exception.args, ('close',))
2739 self.assertIsInstance(err.exception.__context__, OSError)
2740 self.assertEqual(err.exception.__context__.args, ('flush',))
2741 self.assertFalse(txt.closed)
2742
2743 def test_nonnormalized_close_error_on_close(self):
2744 # Issue #21677
2745 buffer = self.BytesIO(self.testdata)
2746 def bad_flush():
2747 raise non_existing_flush
2748 def bad_close():
2749 raise non_existing_close
2750 buffer.close = bad_close
2751 txt = self.TextIOWrapper(buffer, encoding="ascii")
2752 txt.flush = bad_flush
2753 with self.assertRaises(NameError) as err: # exception not swallowed
2754 txt.close()
2755 self.assertIn('non_existing_close', str(err.exception))
2756 self.assertIsInstance(err.exception.__context__, NameError)
2757 self.assertIn('non_existing_flush', str(err.exception.__context__))
2758 self.assertFalse(txt.closed)
2759
Antoine Pitrou6be88762010-05-03 16:48:20 +00002760 def test_multi_close(self):
2761 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2762 txt.close()
2763 txt.close()
2764 txt.close()
2765 self.assertRaises(ValueError, txt.flush)
2766
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002767 def test_unseekable(self):
2768 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2769 self.assertRaises(self.UnsupportedOperation, txt.tell)
2770 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2771
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002772 def test_readonly_attributes(self):
2773 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2774 buf = self.BytesIO(self.testdata)
2775 with self.assertRaises(AttributeError):
2776 txt.buffer = buf
2777
Antoine Pitroue96ec682011-07-23 21:46:35 +02002778 def test_rawio(self):
2779 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2780 # that subprocess.Popen() can have the required unbuffered
2781 # semantics with universal_newlines=True.
2782 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2783 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2784 # Reads
2785 self.assertEqual(txt.read(4), 'abcd')
2786 self.assertEqual(txt.readline(), 'efghi\n')
2787 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2788
2789 def test_rawio_write_through(self):
2790 # Issue #12591: with write_through=True, writes don't need a flush
2791 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2792 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2793 write_through=True)
2794 txt.write('1')
2795 txt.write('23\n4')
2796 txt.write('5')
2797 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2798
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002799 def test_bufio_write_through(self):
2800 # Issue #21396: write_through=True doesn't force a flush()
2801 # on the underlying binary buffered object.
2802 flush_called, write_called = [], []
2803 class BufferedWriter(self.BufferedWriter):
2804 def flush(self, *args, **kwargs):
2805 flush_called.append(True)
2806 return super().flush(*args, **kwargs)
2807 def write(self, *args, **kwargs):
2808 write_called.append(True)
2809 return super().write(*args, **kwargs)
2810
2811 rawio = self.BytesIO()
2812 data = b"a"
2813 bufio = BufferedWriter(rawio, len(data)*2)
2814 textio = self.TextIOWrapper(bufio, encoding='ascii',
2815 write_through=True)
2816 # write to the buffered io but don't overflow the buffer
2817 text = data.decode('ascii')
2818 textio.write(text)
2819
2820 # buffer.flush is not called with write_through=True
2821 self.assertFalse(flush_called)
2822 # buffer.write *is* called with write_through=True
2823 self.assertTrue(write_called)
2824 self.assertEqual(rawio.getvalue(), b"") # no flush
2825
2826 write_called = [] # reset
2827 textio.write(text * 10) # total content is larger than bufio buffer
2828 self.assertTrue(write_called)
2829 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2830
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002831 def test_read_nonbytes(self):
2832 # Issue #17106
2833 # Crash when underlying read() returns non-bytes
2834 t = self.TextIOWrapper(self.StringIO('a'))
2835 self.assertRaises(TypeError, t.read, 1)
2836 t = self.TextIOWrapper(self.StringIO('a'))
2837 self.assertRaises(TypeError, t.readline)
2838 t = self.TextIOWrapper(self.StringIO('a'))
2839 self.assertRaises(TypeError, t.read)
2840
2841 def test_illegal_decoder(self):
2842 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002843 # Bypass the early encoding check added in issue 20404
2844 def _make_illegal_wrapper():
2845 quopri = codecs.lookup("quopri")
2846 quopri._is_text_encoding = True
2847 try:
2848 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2849 newline='\n', encoding="quopri")
2850 finally:
2851 quopri._is_text_encoding = False
2852 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002853 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002854 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002855 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002856 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002857 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002858 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002859 self.assertRaises(TypeError, t.read)
2860
Antoine Pitrou712cb732013-12-21 15:51:54 +01002861 def _check_create_at_shutdown(self, **kwargs):
2862 # Issue #20037: creating a TextIOWrapper at shutdown
2863 # shouldn't crash the interpreter.
2864 iomod = self.io.__name__
2865 code = """if 1:
2866 import codecs
2867 import {iomod} as io
2868
2869 # Avoid looking up codecs at shutdown
2870 codecs.lookup('utf-8')
2871
2872 class C:
2873 def __init__(self):
2874 self.buf = io.BytesIO()
2875 def __del__(self):
2876 io.TextIOWrapper(self.buf, **{kwargs})
2877 print("ok")
2878 c = C()
2879 """.format(iomod=iomod, kwargs=kwargs)
2880 return assert_python_ok("-c", code)
2881
2882 def test_create_at_shutdown_without_encoding(self):
2883 rc, out, err = self._check_create_at_shutdown()
2884 if err:
2885 # Can error out with a RuntimeError if the module state
2886 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002887 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002888 else:
2889 self.assertEqual("ok", out.decode().strip())
2890
2891 def test_create_at_shutdown_with_encoding(self):
2892 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2893 errors='strict')
2894 self.assertFalse(err)
2895 self.assertEqual("ok", out.decode().strip())
2896
Benjamin Peterson6c14f232014-11-12 10:19:46 -05002897 def test_issue22849(self):
2898 class F(object):
2899 def readable(self): return True
2900 def writable(self): return True
2901 def seekable(self): return True
2902
2903 for i in range(10):
2904 try:
2905 self.TextIOWrapper(F(), encoding='utf-8')
2906 except Exception:
2907 pass
2908
2909 F.tell = lambda x: 0
2910 t = self.TextIOWrapper(F(), encoding='utf-8')
2911
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002913class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002914 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002915 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002916
2917 def test_initialization(self):
2918 r = self.BytesIO(b"\xc3\xa9\n\n")
2919 b = self.BufferedReader(r, 1000)
2920 t = self.TextIOWrapper(b)
2921 self.assertRaises(TypeError, t.__init__, b, newline=42)
2922 self.assertRaises(ValueError, t.read)
2923 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2924 self.assertRaises(ValueError, t.read)
2925
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002926 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2927 self.assertRaises(Exception, repr, t)
2928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002929 def test_garbage_collection(self):
2930 # C TextIOWrapper objects are collected, and collecting them flushes
2931 # all data to disk.
2932 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002933 with support.check_warnings(('', ResourceWarning)):
2934 rawio = io.FileIO(support.TESTFN, "wb")
2935 b = self.BufferedWriter(rawio)
2936 t = self.TextIOWrapper(b, encoding="ascii")
2937 t.write("456def")
2938 t.x = t
2939 wr = weakref.ref(t)
2940 del t
2941 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002942 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002943 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002944 self.assertEqual(f.read(), b"456def")
2945
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002946 def test_rwpair_cleared_before_textio(self):
2947 # Issue 13070: TextIOWrapper's finalization would crash when called
2948 # after the reference to the underlying BufferedRWPair's writer got
2949 # cleared by the GC.
2950 for i in range(1000):
2951 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2952 t1 = self.TextIOWrapper(b1, encoding="ascii")
2953 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2954 t2 = self.TextIOWrapper(b2, encoding="ascii")
2955 # circular references
2956 t1.buddy = t2
2957 t2.buddy = t1
2958 support.gc_collect()
2959
2960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002961class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002962 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002963 #shutdown_error = "LookupError: unknown encoding: ascii"
2964 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002965
2966
2967class IncrementalNewlineDecoderTest(unittest.TestCase):
2968
2969 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002970 # UTF-8 specific tests for a newline decoder
2971 def _check_decode(b, s, **kwargs):
2972 # We exercise getstate() / setstate() as well as decode()
2973 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002974 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002975 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002976 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002977
Antoine Pitrou180a3362008-12-14 16:36:46 +00002978 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002979
Antoine Pitrou180a3362008-12-14 16:36:46 +00002980 _check_decode(b'\xe8', "")
2981 _check_decode(b'\xa2', "")
2982 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002983
Antoine Pitrou180a3362008-12-14 16:36:46 +00002984 _check_decode(b'\xe8', "")
2985 _check_decode(b'\xa2', "")
2986 _check_decode(b'\x88', "\u8888")
2987
2988 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002989 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2990
Antoine Pitrou180a3362008-12-14 16:36:46 +00002991 decoder.reset()
2992 _check_decode(b'\n', "\n")
2993 _check_decode(b'\r', "")
2994 _check_decode(b'', "\n", final=True)
2995 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002996
Antoine Pitrou180a3362008-12-14 16:36:46 +00002997 _check_decode(b'\r', "")
2998 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002999
Antoine Pitrou180a3362008-12-14 16:36:46 +00003000 _check_decode(b'\r\r\n', "\n\n")
3001 _check_decode(b'\r', "")
3002 _check_decode(b'\r', "\n")
3003 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003004
Antoine Pitrou180a3362008-12-14 16:36:46 +00003005 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3006 _check_decode(b'\xe8\xa2\x88', "\u8888")
3007 _check_decode(b'\n', "\n")
3008 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3009 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003011 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003012 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003013 if encoding is not None:
3014 encoder = codecs.getincrementalencoder(encoding)()
3015 def _decode_bytewise(s):
3016 # Decode one byte at a time
3017 for b in encoder.encode(s):
3018 result.append(decoder.decode(bytes([b])))
3019 else:
3020 encoder = None
3021 def _decode_bytewise(s):
3022 # Decode one char at a time
3023 for c in s:
3024 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003025 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003026 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003027 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003028 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003029 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003030 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003031 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003032 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003033 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003034 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003035 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003036 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003037 input = "abc"
3038 if encoder is not None:
3039 encoder.reset()
3040 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003041 self.assertEqual(decoder.decode(input), "abc")
3042 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003043
3044 def test_newline_decoder(self):
3045 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003046 # None meaning the IncrementalNewlineDecoder takes unicode input
3047 # rather than bytes input
3048 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003049 'utf-16', 'utf-16-le', 'utf-16-be',
3050 'utf-32', 'utf-32-le', 'utf-32-be',
3051 )
3052 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003053 decoder = enc and codecs.getincrementaldecoder(enc)()
3054 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3055 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003056 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003057 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3058 self.check_newline_decoding_utf8(decoder)
3059
Antoine Pitrou66913e22009-03-06 23:40:56 +00003060 def test_newline_bytes(self):
3061 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3062 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003063 self.assertEqual(dec.newlines, None)
3064 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3065 self.assertEqual(dec.newlines, None)
3066 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3067 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003068 dec = self.IncrementalNewlineDecoder(None, translate=False)
3069 _check(dec)
3070 dec = self.IncrementalNewlineDecoder(None, translate=True)
3071 _check(dec)
3072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003073class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3074 pass
3075
3076class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3077 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003078
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003079
Guido van Rossum01a27522007-03-07 01:00:12 +00003080# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003081
Guido van Rossum5abbf752007-08-27 17:39:33 +00003082class MiscIOTest(unittest.TestCase):
3083
Barry Warsaw40e82462008-11-20 20:14:50 +00003084 def tearDown(self):
3085 support.unlink(support.TESTFN)
3086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003087 def test___all__(self):
3088 for name in self.io.__all__:
3089 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003090 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00003091 if name == "open":
3092 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003093 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003094 self.assertTrue(issubclass(obj, Exception), name)
3095 elif not name.startswith("SEEK_"):
3096 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003097
Barry Warsaw40e82462008-11-20 20:14:50 +00003098 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003099 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003100 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003101 f.close()
3102
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003103 with support.check_warnings(('', DeprecationWarning)):
3104 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003105 self.assertEqual(f.name, support.TESTFN)
3106 self.assertEqual(f.buffer.name, support.TESTFN)
3107 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3108 self.assertEqual(f.mode, "U")
3109 self.assertEqual(f.buffer.mode, "rb")
3110 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003111 f.close()
3112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003113 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003114 self.assertEqual(f.mode, "w+")
3115 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3116 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003118 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003119 self.assertEqual(g.mode, "wb")
3120 self.assertEqual(g.raw.mode, "wb")
3121 self.assertEqual(g.name, f.fileno())
3122 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003123 f.close()
3124 g.close()
3125
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003126 def test_io_after_close(self):
3127 for kwargs in [
3128 {"mode": "w"},
3129 {"mode": "wb"},
3130 {"mode": "w", "buffering": 1},
3131 {"mode": "w", "buffering": 2},
3132 {"mode": "wb", "buffering": 0},
3133 {"mode": "r"},
3134 {"mode": "rb"},
3135 {"mode": "r", "buffering": 1},
3136 {"mode": "r", "buffering": 2},
3137 {"mode": "rb", "buffering": 0},
3138 {"mode": "w+"},
3139 {"mode": "w+b"},
3140 {"mode": "w+", "buffering": 1},
3141 {"mode": "w+", "buffering": 2},
3142 {"mode": "w+b", "buffering": 0},
3143 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003144 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003145 f.close()
3146 self.assertRaises(ValueError, f.flush)
3147 self.assertRaises(ValueError, f.fileno)
3148 self.assertRaises(ValueError, f.isatty)
3149 self.assertRaises(ValueError, f.__iter__)
3150 if hasattr(f, "peek"):
3151 self.assertRaises(ValueError, f.peek, 1)
3152 self.assertRaises(ValueError, f.read)
3153 if hasattr(f, "read1"):
3154 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003155 if hasattr(f, "readall"):
3156 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003157 if hasattr(f, "readinto"):
3158 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3159 self.assertRaises(ValueError, f.readline)
3160 self.assertRaises(ValueError, f.readlines)
3161 self.assertRaises(ValueError, f.seek, 0)
3162 self.assertRaises(ValueError, f.tell)
3163 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003164 self.assertRaises(ValueError, f.write,
3165 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003166 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003167 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003169 def test_blockingioerror(self):
3170 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003171 class C(str):
3172 pass
3173 c = C("")
3174 b = self.BlockingIOError(1, c)
3175 c.b = b
3176 b.c = c
3177 wr = weakref.ref(c)
3178 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003179 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003180 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003181
3182 def test_abcs(self):
3183 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003184 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3185 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3186 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3187 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003188
3189 def _check_abc_inheritance(self, abcmodule):
3190 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003191 self.assertIsInstance(f, abcmodule.IOBase)
3192 self.assertIsInstance(f, abcmodule.RawIOBase)
3193 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3194 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003195 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003196 self.assertIsInstance(f, abcmodule.IOBase)
3197 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3198 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3199 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003200 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003201 self.assertIsInstance(f, abcmodule.IOBase)
3202 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3203 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3204 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003205
3206 def test_abc_inheritance(self):
3207 # Test implementations inherit from their respective ABCs
3208 self._check_abc_inheritance(self)
3209
3210 def test_abc_inheritance_official(self):
3211 # Test implementations inherit from the official ABCs of the
3212 # baseline "io" module.
3213 self._check_abc_inheritance(io)
3214
Antoine Pitroue033e062010-10-29 10:38:18 +00003215 def _check_warn_on_dealloc(self, *args, **kwargs):
3216 f = open(*args, **kwargs)
3217 r = repr(f)
3218 with self.assertWarns(ResourceWarning) as cm:
3219 f = None
3220 support.gc_collect()
3221 self.assertIn(r, str(cm.warning.args[0]))
3222
3223 def test_warn_on_dealloc(self):
3224 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3225 self._check_warn_on_dealloc(support.TESTFN, "wb")
3226 self._check_warn_on_dealloc(support.TESTFN, "w")
3227
3228 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3229 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003230 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003231 for fd in fds:
3232 try:
3233 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003234 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003235 if e.errno != errno.EBADF:
3236 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003237 self.addCleanup(cleanup_fds)
3238 r, w = os.pipe()
3239 fds += r, w
3240 self._check_warn_on_dealloc(r, *args, **kwargs)
3241 # When using closefd=False, there's no warning
3242 r, w = os.pipe()
3243 fds += r, w
3244 with warnings.catch_warnings(record=True) as recorded:
3245 open(r, *args, closefd=False, **kwargs)
3246 support.gc_collect()
3247 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003248
3249 def test_warn_on_dealloc_fd(self):
3250 self._check_warn_on_dealloc_fd("rb", buffering=0)
3251 self._check_warn_on_dealloc_fd("rb")
3252 self._check_warn_on_dealloc_fd("r")
3253
3254
Antoine Pitrou243757e2010-11-05 21:15:39 +00003255 def test_pickling(self):
3256 # Pickling file objects is forbidden
3257 for kwargs in [
3258 {"mode": "w"},
3259 {"mode": "wb"},
3260 {"mode": "wb", "buffering": 0},
3261 {"mode": "r"},
3262 {"mode": "rb"},
3263 {"mode": "rb", "buffering": 0},
3264 {"mode": "w+"},
3265 {"mode": "w+b"},
3266 {"mode": "w+b", "buffering": 0},
3267 ]:
3268 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3269 with self.open(support.TESTFN, **kwargs) as f:
3270 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3271
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003272 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3273 def test_nonblock_pipe_write_bigbuf(self):
3274 self._test_nonblock_pipe_write(16*1024)
3275
3276 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3277 def test_nonblock_pipe_write_smallbuf(self):
3278 self._test_nonblock_pipe_write(1024)
3279
3280 def _set_non_blocking(self, fd):
3281 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3282 self.assertNotEqual(flags, -1)
3283 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3284 self.assertEqual(res, 0)
3285
3286 def _test_nonblock_pipe_write(self, bufsize):
3287 sent = []
3288 received = []
3289 r, w = os.pipe()
3290 self._set_non_blocking(r)
3291 self._set_non_blocking(w)
3292
3293 # To exercise all code paths in the C implementation we need
3294 # to play with buffer sizes. For instance, if we choose a
3295 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3296 # then we will never get a partial write of the buffer.
3297 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3298 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3299
3300 with rf, wf:
3301 for N in 9999, 73, 7574:
3302 try:
3303 i = 0
3304 while True:
3305 msg = bytes([i % 26 + 97]) * N
3306 sent.append(msg)
3307 wf.write(msg)
3308 i += 1
3309
3310 except self.BlockingIOError as e:
3311 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003312 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003313 sent[-1] = sent[-1][:e.characters_written]
3314 received.append(rf.read())
3315 msg = b'BLOCKED'
3316 wf.write(msg)
3317 sent.append(msg)
3318
3319 while True:
3320 try:
3321 wf.flush()
3322 break
3323 except self.BlockingIOError as e:
3324 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003325 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003326 self.assertEqual(e.characters_written, 0)
3327 received.append(rf.read())
3328
3329 received += iter(rf.read, None)
3330
3331 sent, received = b''.join(sent), b''.join(received)
3332 self.assertTrue(sent == received)
3333 self.assertTrue(wf.closed)
3334 self.assertTrue(rf.closed)
3335
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003336 def test_create_fail(self):
3337 # 'x' mode fails if file is existing
3338 with self.open(support.TESTFN, 'w'):
3339 pass
3340 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3341
3342 def test_create_writes(self):
3343 # 'x' mode opens for writing
3344 with self.open(support.TESTFN, 'xb') as f:
3345 f.write(b"spam")
3346 with self.open(support.TESTFN, 'rb') as f:
3347 self.assertEqual(b"spam", f.read())
3348
Christian Heimes7b648752012-09-10 14:48:43 +02003349 def test_open_allargs(self):
3350 # there used to be a buffer overflow in the parser for rawmode
3351 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3352
3353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003354class CMiscIOTest(MiscIOTest):
3355 io = io
3356
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003357 def test_readinto_buffer_overflow(self):
3358 # Issue #18025
3359 class BadReader(self.io.BufferedIOBase):
3360 def read(self, n=-1):
3361 return b'x' * 10**6
3362 bufio = BadReader()
3363 b = bytearray(2)
3364 self.assertRaises(ValueError, bufio.readinto, b)
3365
Antoine Pitrou25f85d42015-04-13 19:41:47 +02003366 @unittest.skipUnless(threading, 'Threading required for this test.')
3367 def check_daemon_threads_shutdown_deadlock(self, stream_name):
3368 # Issue #23309: deadlocks at shutdown should be avoided when a
3369 # daemon thread and the main thread both write to a file.
3370 code = """if 1:
3371 import sys
3372 import time
3373 import threading
3374
3375 file = sys.{stream_name}
3376
3377 def run():
3378 while True:
3379 file.write('.')
3380 file.flush()
3381
3382 thread = threading.Thread(target=run)
3383 thread.daemon = True
3384 thread.start()
3385
3386 time.sleep(0.5)
3387 file.write('!')
3388 file.flush()
3389 """.format_map(locals())
3390 res, _ = run_python_until_end("-c", code)
3391 err = res.err.decode()
3392 if res.rc != 0:
3393 # Failure: should be a fatal error
3394 self.assertIn("Fatal Python error: could not acquire lock "
3395 "for <_io.BufferedWriter name='<{stream_name}>'> "
3396 "at interpreter shutdown, possibly due to "
3397 "daemon threads".format_map(locals()),
3398 err)
3399 else:
3400 self.assertFalse(err.strip('.!'))
3401
3402 def test_daemon_threads_shutdown_stdout_deadlock(self):
3403 self.check_daemon_threads_shutdown_deadlock('stdout')
3404
3405 def test_daemon_threads_shutdown_stderr_deadlock(self):
3406 self.check_daemon_threads_shutdown_deadlock('stderr')
3407
3408
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003409class PyMiscIOTest(MiscIOTest):
3410 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003411
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003412
3413@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3414class SignalsTest(unittest.TestCase):
3415
3416 def setUp(self):
3417 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3418
3419 def tearDown(self):
3420 signal.signal(signal.SIGALRM, self.oldalrm)
3421
3422 def alarm_interrupt(self, sig, frame):
3423 1/0
3424
3425 @unittest.skipUnless(threading, 'Threading required for this test.')
3426 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3427 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003428 invokes the signal handler, and bubbles up the exception raised
3429 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003430 read_results = []
3431 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003432 if hasattr(signal, 'pthread_sigmask'):
3433 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003434 s = os.read(r, 1)
3435 read_results.append(s)
3436 t = threading.Thread(target=_read)
3437 t.daemon = True
3438 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003439 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003440 try:
3441 wio = self.io.open(w, **fdopen_kwargs)
3442 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003443 # Fill the pipe enough that the write will be blocking.
3444 # It will be interrupted by the timer armed above. Since the
3445 # other thread has read one byte, the low-level write will
3446 # return with a successful (partial) result rather than an EINTR.
3447 # The buffered IO layer must check for pending signal
3448 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003449 signal.alarm(1)
3450 try:
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003451 with self.assertRaises(ZeroDivisionError):
3452 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
Victor Stinner775b2dd2013-07-15 19:53:13 +02003453 finally:
3454 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003455 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003456 # We got one byte, get another one and check that it isn't a
3457 # repeat of the first one.
3458 read_results.append(os.read(r, 1))
3459 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3460 finally:
3461 os.close(w)
3462 os.close(r)
3463 # This is deliberate. If we didn't close the file descriptor
3464 # before closing wio, wio would try to flush its internal
3465 # buffer, and block again.
3466 try:
3467 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003468 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003469 if e.errno != errno.EBADF:
3470 raise
3471
3472 def test_interrupted_write_unbuffered(self):
3473 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3474
3475 def test_interrupted_write_buffered(self):
3476 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3477
Victor Stinner6ab72862014-09-03 23:32:28 +02003478 # Issue #22331: The test hangs on FreeBSD 7.2
3479 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003480 def test_interrupted_write_text(self):
3481 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3482
Brett Cannon31f59292011-02-21 19:29:56 +00003483 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003484 def check_reentrant_write(self, data, **fdopen_kwargs):
3485 def on_alarm(*args):
3486 # Will be called reentrantly from the same thread
3487 wio.write(data)
3488 1/0
3489 signal.signal(signal.SIGALRM, on_alarm)
3490 r, w = os.pipe()
3491 wio = self.io.open(w, **fdopen_kwargs)
3492 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003493 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003494 # Either the reentrant call to wio.write() fails with RuntimeError,
3495 # or the signal handler raises ZeroDivisionError.
3496 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3497 while 1:
3498 for i in range(100):
3499 wio.write(data)
3500 wio.flush()
3501 # Make sure the buffer doesn't fill up and block further writes
3502 os.read(r, len(data) * 100)
3503 exc = cm.exception
3504 if isinstance(exc, RuntimeError):
3505 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3506 finally:
3507 wio.close()
3508 os.close(r)
3509
3510 def test_reentrant_write_buffered(self):
3511 self.check_reentrant_write(b"xy", mode="wb")
3512
3513 def test_reentrant_write_text(self):
3514 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3515
Antoine Pitrou707ce822011-02-25 21:24:11 +00003516 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3517 """Check that a buffered read, when it gets interrupted (either
3518 returning a partial result or EINTR), properly invokes the signal
3519 handler and retries if the latter returned successfully."""
3520 r, w = os.pipe()
3521 fdopen_kwargs["closefd"] = False
3522 def alarm_handler(sig, frame):
3523 os.write(w, b"bar")
3524 signal.signal(signal.SIGALRM, alarm_handler)
3525 try:
3526 rio = self.io.open(r, **fdopen_kwargs)
3527 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003528 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003529 # Expected behaviour:
3530 # - first raw read() returns partial b"foo"
3531 # - second raw read() returns EINTR
3532 # - third raw read() returns b"bar"
3533 self.assertEqual(decode(rio.read(6)), "foobar")
3534 finally:
3535 rio.close()
3536 os.close(w)
3537 os.close(r)
3538
Antoine Pitrou20db5112011-08-19 20:32:34 +02003539 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003540 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3541 mode="rb")
3542
Antoine Pitrou20db5112011-08-19 20:32:34 +02003543 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003544 self.check_interrupted_read_retry(lambda x: x,
3545 mode="r")
3546
3547 @unittest.skipUnless(threading, 'Threading required for this test.')
3548 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3549 """Check that a buffered write, when it gets interrupted (either
3550 returning a partial result or EINTR), properly invokes the signal
3551 handler and retries if the latter returned successfully."""
3552 select = support.import_module("select")
3553 # A quantity that exceeds the buffer size of an anonymous pipe's
3554 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003555 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003556 r, w = os.pipe()
3557 fdopen_kwargs["closefd"] = False
3558 # We need a separate thread to read from the pipe and allow the
3559 # write() to finish. This thread is started after the SIGALRM is
3560 # received (forcing a first EINTR in write()).
3561 read_results = []
3562 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003563 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00003564 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003565 try:
3566 while not write_finished:
3567 while r in select.select([r], [], [], 1.0)[0]:
3568 s = os.read(r, 1024)
3569 read_results.append(s)
3570 except BaseException as exc:
3571 nonlocal error
3572 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00003573 t = threading.Thread(target=_read)
3574 t.daemon = True
3575 def alarm1(sig, frame):
3576 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003577 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003578 def alarm2(sig, frame):
3579 t.start()
3580 signal.signal(signal.SIGALRM, alarm1)
3581 try:
3582 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003583 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003584 # Expected behaviour:
3585 # - first raw write() is partial (because of the limited pipe buffer
3586 # and the first alarm)
3587 # - second raw write() returns EINTR (because of the second alarm)
3588 # - subsequent write()s are successful (either partial or complete)
3589 self.assertEqual(N, wio.write(item * N))
3590 wio.flush()
3591 write_finished = True
3592 t.join()
Serhiy Storchaka9db55002015-03-28 20:38:37 +02003593
3594 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003595 self.assertEqual(N, sum(len(x) for x in read_results))
3596 finally:
3597 write_finished = True
3598 os.close(w)
3599 os.close(r)
3600 # This is deliberate. If we didn't close the file descriptor
3601 # before closing wio, wio would try to flush its internal
3602 # buffer, and could block (in case of failure).
3603 try:
3604 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003605 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003606 if e.errno != errno.EBADF:
3607 raise
3608
Antoine Pitrou20db5112011-08-19 20:32:34 +02003609 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003610 self.check_interrupted_write_retry(b"x", mode="wb")
3611
Antoine Pitrou20db5112011-08-19 20:32:34 +02003612 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003613 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3614
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003615
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003616class CSignalsTest(SignalsTest):
3617 io = io
3618
3619class PySignalsTest(SignalsTest):
3620 io = pyio
3621
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003622 # Handling reentrancy issues would slow down _pyio even more, so the
3623 # tests are disabled.
3624 test_reentrant_write_buffered = None
3625 test_reentrant_write_text = None
3626
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003627
Ezio Melottidaa42c72013-03-23 16:30:16 +02003628def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003629 tests = (CIOTest, PyIOTest,
3630 CBufferedReaderTest, PyBufferedReaderTest,
3631 CBufferedWriterTest, PyBufferedWriterTest,
3632 CBufferedRWPairTest, PyBufferedRWPairTest,
3633 CBufferedRandomTest, PyBufferedRandomTest,
3634 StatefulIncrementalDecoderTest,
3635 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3636 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003637 CMiscIOTest, PyMiscIOTest,
3638 CSignalsTest, PySignalsTest,
3639 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003640
3641 # Put the namespaces of the IO module we are testing and some useful mock
3642 # classes in the __dict__ of each test.
3643 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003644 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003645 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3646 c_io_ns = {name : getattr(io, name) for name in all_members}
3647 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3648 globs = globals()
3649 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3650 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3651 # Avoid turning open into a bound method.
3652 py_io_ns["open"] = pyio.OpenWrapper
3653 for test in tests:
3654 if test.__name__.startswith("C"):
3655 for name, obj in c_io_ns.items():
3656 setattr(test, name, obj)
3657 elif test.__name__.startswith("Py"):
3658 for name, obj in py_io_ns.items():
3659 setattr(test, name, obj)
3660
Ezio Melottidaa42c72013-03-23 16:30:16 +02003661 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3662 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003663
3664if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003665 unittest.main()