blob: b5506b0ac49ecdaf111b8e282255cf9db634e5d2 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300656 def test_nonbuffered_textio(self):
657 with warnings.catch_warnings(record=True) as recorded:
658 with self.assertRaises(ValueError):
659 self.open(support.TESTFN, 'w', buffering=0)
660 support.gc_collect()
661 self.assertEqual(recorded, [])
662
663 def test_invalid_newline(self):
664 with warnings.catch_warnings(record=True) as recorded:
665 with self.assertRaises(ValueError):
666 self.open(support.TESTFN, 'w', newline='invalid')
667 support.gc_collect()
668 self.assertEqual(recorded, [])
669
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200672
673 def test_IOBase_finalize(self):
674 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
675 # class which inherits IOBase and an object of this class are caught
676 # in a reference cycle and close() is already in the method cache.
677 class MyIO(self.IOBase):
678 def close(self):
679 pass
680
681 # create an instance to populate the method cache
682 MyIO()
683 obj = MyIO()
684 obj.obj = obj
685 wr = weakref.ref(obj)
686 del MyIO
687 del obj
688 support.gc_collect()
689 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class PyIOTest(IOTest):
692 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000693
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695class CommonBufferedTests:
696 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
697
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000698 def test_detach(self):
699 raw = self.MockRawIO()
700 buf = self.tp(raw)
701 self.assertIs(buf.detach(), raw)
702 self.assertRaises(ValueError, buf.detach)
703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704 def test_fileno(self):
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707
Ezio Melottib3aedd42010-11-20 19:04:17 +0000708 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709
Zachary Ware9fe6d862013-12-08 00:20:35 -0600710 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_no_fileno(self):
712 # XXX will we always have fileno() function? If so, kill
713 # this test. Else, write it.
714 pass
715
716 def test_invalid_args(self):
717 rawio = self.MockRawIO()
718 bufio = self.tp(rawio)
719 # Invalid whence
720 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200721 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722
723 def test_override_destructor(self):
724 tp = self.tp
725 record = []
726 class MyBufferedIO(tp):
727 def __del__(self):
728 record.append(1)
729 try:
730 f = super().__del__
731 except AttributeError:
732 pass
733 else:
734 f()
735 def close(self):
736 record.append(2)
737 super().close()
738 def flush(self):
739 record.append(3)
740 super().flush()
741 rawio = self.MockRawIO()
742 bufio = MyBufferedIO(rawio)
743 writable = bufio.writable()
744 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000745 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 if writable:
747 self.assertEqual(record, [1, 2, 3])
748 else:
749 self.assertEqual(record, [1, 2])
750
751 def test_context_manager(self):
752 # Test usability as a context manager
753 rawio = self.MockRawIO()
754 bufio = self.tp(rawio)
755 def _with():
756 with bufio:
757 pass
758 _with()
759 # bufio should now be closed, and using it a second time should raise
760 # a ValueError.
761 self.assertRaises(ValueError, _with)
762
763 def test_error_through_destructor(self):
764 # Test that the exception state is not modified by a destructor,
765 # even if close() fails.
766 rawio = self.CloseFailureIO()
767 def f():
768 self.tp(rawio).xyzzy
769 with support.captured_output("stderr") as s:
770 self.assertRaises(AttributeError, f)
771 s = s.getvalue().strip()
772 if s:
773 # The destructor *may* have printed an unraisable error, check it
774 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200775 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000776 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000777
Antoine Pitrou716c4442009-05-23 19:04:03 +0000778 def test_repr(self):
779 raw = self.MockRawIO()
780 b = self.tp(raw)
781 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
782 self.assertEqual(repr(b), "<%s>" % clsname)
783 raw.name = "dummy"
784 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
785 raw.name = b"dummy"
786 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
787
Antoine Pitrou6be88762010-05-03 16:48:20 +0000788 def test_flush_error_on_close(self):
789 raw = self.MockRawIO()
790 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200791 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000792 raw.flush = bad_flush
793 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200794 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600795 self.assertTrue(b.closed)
796
797 def test_close_error_on_close(self):
798 raw = self.MockRawIO()
799 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200800 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600801 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600803 raw.close = bad_close
804 b = self.tp(raw)
805 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200806 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600807 b.close()
808 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300809 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600810 self.assertEqual(err.exception.__context__.args, ('flush',))
811 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000812
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300813 def test_nonnormalized_close_error_on_close(self):
814 # Issue #21677
815 raw = self.MockRawIO()
816 def bad_flush():
817 raise non_existing_flush
818 def bad_close():
819 raise non_existing_close
820 raw.close = bad_close
821 b = self.tp(raw)
822 b.flush = bad_flush
823 with self.assertRaises(NameError) as err: # exception not swallowed
824 b.close()
825 self.assertIn('non_existing_close', str(err.exception))
826 self.assertIsInstance(err.exception.__context__, NameError)
827 self.assertIn('non_existing_flush', str(err.exception.__context__))
828 self.assertFalse(b.closed)
829
Antoine Pitrou6be88762010-05-03 16:48:20 +0000830 def test_multi_close(self):
831 raw = self.MockRawIO()
832 b = self.tp(raw)
833 b.close()
834 b.close()
835 b.close()
836 self.assertRaises(ValueError, b.flush)
837
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000838 def test_unseekable(self):
839 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
840 self.assertRaises(self.UnsupportedOperation, bufio.tell)
841 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
842
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000843 def test_readonly_attributes(self):
844 raw = self.MockRawIO()
845 buf = self.tp(raw)
846 x = self.MockRawIO()
847 with self.assertRaises(AttributeError):
848 buf.raw = x
849
Guido van Rossum78892e42007-04-06 17:31:18 +0000850
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200851class SizeofTest:
852
853 @support.cpython_only
854 def test_sizeof(self):
855 bufsize1 = 4096
856 bufsize2 = 8192
857 rawio = self.MockRawIO()
858 bufio = self.tp(rawio, buffer_size=bufsize1)
859 size = sys.getsizeof(bufio) - bufsize1
860 rawio = self.MockRawIO()
861 bufio = self.tp(rawio, buffer_size=bufsize2)
862 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
863
Jesus Ceadc469452012-10-04 12:37:56 +0200864 @support.cpython_only
865 def test_buffer_freeing(self) :
866 bufsize = 4096
867 rawio = self.MockRawIO()
868 bufio = self.tp(rawio, buffer_size=bufsize)
869 size = sys.getsizeof(bufio) - bufsize
870 bufio.close()
871 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
874 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_constructor(self):
877 rawio = self.MockRawIO([b"abc"])
878 bufio = self.tp(rawio)
879 bufio.__init__(rawio)
880 bufio.__init__(rawio, buffer_size=1024)
881 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000882 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
884 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
885 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
886 rawio = self.MockRawIO([b"abc"])
887 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000889
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200890 def test_uninitialized(self):
891 bufio = self.tp.__new__(self.tp)
892 del bufio
893 bufio = self.tp.__new__(self.tp)
894 self.assertRaisesRegex((ValueError, AttributeError),
895 'uninitialized|has no attribute',
896 bufio.read, 0)
897 bufio.__init__(self.MockRawIO())
898 self.assertEqual(bufio.read(0), b'')
899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000900 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000901 for arg in (None, 7):
902 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
903 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000905 # Invalid args
906 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_read1(self):
909 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
910 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000911 self.assertEqual(b"a", bufio.read(1))
912 self.assertEqual(b"b", bufio.read1(1))
913 self.assertEqual(rawio._reads, 1)
914 self.assertEqual(b"c", bufio.read1(100))
915 self.assertEqual(rawio._reads, 1)
916 self.assertEqual(b"d", bufio.read1(100))
917 self.assertEqual(rawio._reads, 2)
918 self.assertEqual(b"efg", bufio.read1(100))
919 self.assertEqual(rawio._reads, 3)
920 self.assertEqual(b"", bufio.read1(100))
921 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 # Invalid args
923 self.assertRaises(ValueError, bufio.read1, -1)
924
925 def test_readinto(self):
926 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
927 bufio = self.tp(rawio)
928 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"ab")
931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"cd")
933 self.assertEqual(bufio.readinto(b), 2)
934 self.assertEqual(b, b"ef")
935 self.assertEqual(bufio.readinto(b), 1)
936 self.assertEqual(b, b"gf")
937 self.assertEqual(bufio.readinto(b), 0)
938 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200939 rawio = self.MockRawIO((b"abc", None))
940 bufio = self.tp(rawio)
941 self.assertEqual(bufio.readinto(b), 2)
942 self.assertEqual(b, b"ab")
943 self.assertEqual(bufio.readinto(b), 1)
944 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000946 def test_readlines(self):
947 def bufio():
948 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
949 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000950 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
951 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
952 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000955 data = b"abcdefghi"
956 dlen = len(data)
957
958 tests = [
959 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
960 [ 100, [ 3, 3, 3], [ dlen ] ],
961 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
962 ]
963
964 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 rawio = self.MockFileIO(data)
966 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000967 pos = 0
968 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000969 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000970 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000972 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000975 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
977 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000978 self.assertEqual(b"abcd", bufio.read(6))
979 self.assertEqual(b"e", bufio.read(1))
980 self.assertEqual(b"fg", bufio.read())
981 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200982 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000983 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000984
Victor Stinnera80987f2011-05-25 22:47:16 +0200985 rawio = self.MockRawIO((b"a", None, None))
986 self.assertEqual(b"a", rawio.readall())
987 self.assertIsNone(rawio.readall())
988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_read_past_eof(self):
990 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000992
Ezio Melottib3aedd42010-11-20 19:04:17 +0000993 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 def test_read_all(self):
996 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
997 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000998
Ezio Melottib3aedd42010-11-20 19:04:17 +0000999 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001000
Victor Stinner45df8202010-04-28 22:31:17 +00001001 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001002 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001004 try:
1005 # Write out many bytes with exactly the same number of 0's,
1006 # 1's... 255's. This will help us check that concurrent reading
1007 # doesn't duplicate or forget contents.
1008 N = 1000
1009 l = list(range(256)) * N
1010 random.shuffle(l)
1011 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001012 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001013 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001014 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001016 errors = []
1017 results = []
1018 def f():
1019 try:
1020 # Intra-buffer read then buffer-flushing read
1021 for n in cycle([1, 19]):
1022 s = bufio.read(n)
1023 if not s:
1024 break
1025 # list.append() is atomic
1026 results.append(s)
1027 except Exception as e:
1028 errors.append(e)
1029 raise
1030 threads = [threading.Thread(target=f) for x in range(20)]
1031 for t in threads:
1032 t.start()
1033 time.sleep(0.02) # yield
1034 for t in threads:
1035 t.join()
1036 self.assertFalse(errors,
1037 "the following exceptions were caught: %r" % errors)
1038 s = b''.join(results)
1039 for i in range(256):
1040 c = bytes(bytearray([i]))
1041 self.assertEqual(s.count(c), N)
1042 finally:
1043 support.unlink(support.TESTFN)
1044
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001045 def test_unseekable(self):
1046 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1047 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1048 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1049 bufio.read(1)
1050 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1051 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053 def test_misbehaved_io(self):
1054 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1055 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001056 self.assertRaises(OSError, bufio.seek, 0)
1057 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001059 def test_no_extraneous_read(self):
1060 # Issue #9550; when the raw IO object has satisfied the read request,
1061 # we should not issue any additional reads, otherwise it may block
1062 # (e.g. socket).
1063 bufsize = 16
1064 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1065 rawio = self.MockRawIO([b"x" * n])
1066 bufio = self.tp(rawio, bufsize)
1067 self.assertEqual(bufio.read(n), b"x" * n)
1068 # Simple case: one raw read is enough to satisfy the request.
1069 self.assertEqual(rawio._extraneous_reads, 0,
1070 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1071 # A more complex case where two raw reads are needed to satisfy
1072 # the request.
1073 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1074 bufio = self.tp(rawio, bufsize)
1075 self.assertEqual(bufio.read(n), b"x" * n)
1076 self.assertEqual(rawio._extraneous_reads, 0,
1077 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1078
1079
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001080class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 tp = io.BufferedReader
1082
1083 def test_constructor(self):
1084 BufferedReaderTest.test_constructor(self)
1085 # The allocation can succeed on 32-bit builds, e.g. with more
1086 # than 2GB RAM and a 64-bit kernel.
1087 if sys.maxsize > 0x7FFFFFFF:
1088 rawio = self.MockRawIO()
1089 bufio = self.tp(rawio)
1090 self.assertRaises((OverflowError, MemoryError, ValueError),
1091 bufio.__init__, rawio, sys.maxsize)
1092
1093 def test_initialization(self):
1094 rawio = self.MockRawIO([b"abc"])
1095 bufio = self.tp(rawio)
1096 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1097 self.assertRaises(ValueError, bufio.read)
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1099 self.assertRaises(ValueError, bufio.read)
1100 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1101 self.assertRaises(ValueError, bufio.read)
1102
1103 def test_misbehaved_io_read(self):
1104 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1105 bufio = self.tp(rawio)
1106 # _pyio.BufferedReader seems to implement reading different, so that
1107 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001108 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109
1110 def test_garbage_collection(self):
1111 # C BufferedReader objects are collected.
1112 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001113 with support.check_warnings(('', ResourceWarning)):
1114 rawio = self.FileIO(support.TESTFN, "w+b")
1115 f = self.tp(rawio)
1116 f.f = f
1117 wr = weakref.ref(f)
1118 del f
1119 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001120 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121
R David Murray67bfe802013-02-23 21:51:05 -05001122 def test_args_error(self):
1123 # Issue #17275
1124 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1125 self.tp(io.BytesIO(), 1024, 1024, 1024)
1126
1127
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128class PyBufferedReaderTest(BufferedReaderTest):
1129 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001130
Guido van Rossuma9e20242007-03-08 00:43:48 +00001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1133 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def test_constructor(self):
1136 rawio = self.MockRawIO()
1137 bufio = self.tp(rawio)
1138 bufio.__init__(rawio)
1139 bufio.__init__(rawio, buffer_size=1024)
1140 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142 bufio.flush()
1143 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1144 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1145 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1146 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001147 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001151 def test_uninitialized(self):
1152 bufio = self.tp.__new__(self.tp)
1153 del bufio
1154 bufio = self.tp.__new__(self.tp)
1155 self.assertRaisesRegex((ValueError, AttributeError),
1156 'uninitialized|has no attribute',
1157 bufio.write, b'')
1158 bufio.__init__(self.MockRawIO())
1159 self.assertEqual(bufio.write(b''), 0)
1160
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001161 def test_detach_flush(self):
1162 raw = self.MockRawIO()
1163 buf = self.tp(raw)
1164 buf.write(b"howdy!")
1165 self.assertFalse(raw._write_stack)
1166 buf.detach()
1167 self.assertEqual(raw._write_stack, [b"howdy!"])
1168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001170 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 writer = self.MockRawIO()
1172 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001173 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001174 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 def test_write_overflow(self):
1177 writer = self.MockRawIO()
1178 bufio = self.tp(writer, 8)
1179 contents = b"abcdefghijklmnop"
1180 for n in range(0, len(contents), 3):
1181 bufio.write(contents[n:n+3])
1182 flushed = b"".join(writer._write_stack)
1183 # At least (total - 8) bytes were implicitly flushed, perhaps more
1184 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001185 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 def check_writes(self, intermediate_func):
1188 # Lots of writes, test the flushed output is as expected.
1189 contents = bytes(range(256)) * 1000
1190 n = 0
1191 writer = self.MockRawIO()
1192 bufio = self.tp(writer, 13)
1193 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1194 def gen_sizes():
1195 for size in count(1):
1196 for i in range(15):
1197 yield size
1198 sizes = gen_sizes()
1199 while n < len(contents):
1200 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001201 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 intermediate_func(bufio)
1203 n += size
1204 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001205 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 def test_writes(self):
1208 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 def test_writes_and_flushes(self):
1211 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213 def test_writes_and_seeks(self):
1214 def _seekabs(bufio):
1215 pos = bufio.tell()
1216 bufio.seek(pos + 1, 0)
1217 bufio.seek(pos - 1, 0)
1218 bufio.seek(pos, 0)
1219 self.check_writes(_seekabs)
1220 def _seekrel(bufio):
1221 pos = bufio.seek(0, 1)
1222 bufio.seek(+1, 1)
1223 bufio.seek(-1, 1)
1224 bufio.seek(pos, 0)
1225 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_writes_and_truncates(self):
1228 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 def test_write_non_blocking(self):
1231 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001232 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001233
Ezio Melottib3aedd42010-11-20 19:04:17 +00001234 self.assertEqual(bufio.write(b"abcd"), 4)
1235 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236 # 1 byte will be written, the rest will be buffered
1237 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001238 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001240 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1241 raw.block_on(b"0")
1242 try:
1243 bufio.write(b"opqrwxyz0123456789")
1244 except self.BlockingIOError as e:
1245 written = e.characters_written
1246 else:
1247 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001248 self.assertEqual(written, 16)
1249 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001251
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 s = raw.pop_written()
1254 # Previously buffered bytes were flushed
1255 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_write_and_rewind(self):
1258 raw = io.BytesIO()
1259 bufio = self.tp(raw, 4)
1260 self.assertEqual(bufio.write(b"abcdef"), 6)
1261 self.assertEqual(bufio.tell(), 6)
1262 bufio.seek(0, 0)
1263 self.assertEqual(bufio.write(b"XY"), 2)
1264 bufio.seek(6, 0)
1265 self.assertEqual(raw.getvalue(), b"XYcdef")
1266 self.assertEqual(bufio.write(b"123456"), 6)
1267 bufio.flush()
1268 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 def test_flush(self):
1271 writer = self.MockRawIO()
1272 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001273 bufio.write(b"abc")
1274 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001275 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001276
Antoine Pitrou131a4892012-10-16 22:57:11 +02001277 def test_writelines(self):
1278 l = [b'ab', b'cd', b'ef']
1279 writer = self.MockRawIO()
1280 bufio = self.tp(writer, 8)
1281 bufio.writelines(l)
1282 bufio.flush()
1283 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1284
1285 def test_writelines_userlist(self):
1286 l = UserList([b'ab', b'cd', b'ef'])
1287 writer = self.MockRawIO()
1288 bufio = self.tp(writer, 8)
1289 bufio.writelines(l)
1290 bufio.flush()
1291 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1292
1293 def test_writelines_error(self):
1294 writer = self.MockRawIO()
1295 bufio = self.tp(writer, 8)
1296 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1297 self.assertRaises(TypeError, bufio.writelines, None)
1298 self.assertRaises(TypeError, bufio.writelines, 'abc')
1299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 def test_destructor(self):
1301 writer = self.MockRawIO()
1302 bufio = self.tp(writer, 8)
1303 bufio.write(b"abc")
1304 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001305 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001306 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001307
1308 def test_truncate(self):
1309 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001310 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311 bufio = self.tp(raw, 8)
1312 bufio.write(b"abcdef")
1313 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001314 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001315 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001316 self.assertEqual(f.read(), b"abc")
1317
Victor Stinner45df8202010-04-28 22:31:17 +00001318 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001319 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001321 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 # Write out many bytes from many threads and test they were
1323 # all flushed.
1324 N = 1000
1325 contents = bytes(range(256)) * N
1326 sizes = cycle([1, 19])
1327 n = 0
1328 queue = deque()
1329 while n < len(contents):
1330 size = next(sizes)
1331 queue.append(contents[n:n+size])
1332 n += size
1333 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001334 # We use a real file object because it allows us to
1335 # exercise situations where the GIL is released before
1336 # writing the buffer to the raw streams. This is in addition
1337 # to concurrency issues due to switching threads in the middle
1338 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001339 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001341 errors = []
1342 def f():
1343 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001344 while True:
1345 try:
1346 s = queue.popleft()
1347 except IndexError:
1348 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001349 bufio.write(s)
1350 except Exception as e:
1351 errors.append(e)
1352 raise
1353 threads = [threading.Thread(target=f) for x in range(20)]
1354 for t in threads:
1355 t.start()
1356 time.sleep(0.02) # yield
1357 for t in threads:
1358 t.join()
1359 self.assertFalse(errors,
1360 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001362 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 s = f.read()
1364 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001365 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001366 finally:
1367 support.unlink(support.TESTFN)
1368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 def test_misbehaved_io(self):
1370 rawio = self.MisbehavedRawIO()
1371 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001372 self.assertRaises(OSError, bufio.seek, 0)
1373 self.assertRaises(OSError, bufio.tell)
1374 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375
Florent Xicluna109d5732012-07-07 17:03:22 +02001376 def test_max_buffer_size_removal(self):
1377 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001378 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001379
Benjamin Peterson68623612012-12-20 11:53:11 -06001380 def test_write_error_on_close(self):
1381 raw = self.MockRawIO()
1382 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001383 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001384 raw.write = bad_write
1385 b = self.tp(raw)
1386 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001387 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001388 self.assertTrue(b.closed)
1389
Benjamin Peterson59406a92009-03-26 17:10:29 +00001390
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001391class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 tp = io.BufferedWriter
1393
1394 def test_constructor(self):
1395 BufferedWriterTest.test_constructor(self)
1396 # The allocation can succeed on 32-bit builds, e.g. with more
1397 # than 2GB RAM and a 64-bit kernel.
1398 if sys.maxsize > 0x7FFFFFFF:
1399 rawio = self.MockRawIO()
1400 bufio = self.tp(rawio)
1401 self.assertRaises((OverflowError, MemoryError, ValueError),
1402 bufio.__init__, rawio, sys.maxsize)
1403
1404 def test_initialization(self):
1405 rawio = self.MockRawIO()
1406 bufio = self.tp(rawio)
1407 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1408 self.assertRaises(ValueError, bufio.write, b"def")
1409 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1410 self.assertRaises(ValueError, bufio.write, b"def")
1411 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1412 self.assertRaises(ValueError, bufio.write, b"def")
1413
1414 def test_garbage_collection(self):
1415 # C BufferedWriter objects are collected, and collecting them flushes
1416 # all data to disk.
1417 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001418 with support.check_warnings(('', ResourceWarning)):
1419 rawio = self.FileIO(support.TESTFN, "w+b")
1420 f = self.tp(rawio)
1421 f.write(b"123xxx")
1422 f.x = f
1423 wr = weakref.ref(f)
1424 del f
1425 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001426 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001427 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 self.assertEqual(f.read(), b"123xxx")
1429
R David Murray67bfe802013-02-23 21:51:05 -05001430 def test_args_error(self):
1431 # Issue #17275
1432 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1433 self.tp(io.BytesIO(), 1024, 1024, 1024)
1434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435
1436class PyBufferedWriterTest(BufferedWriterTest):
1437 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001438
Guido van Rossum01a27522007-03-07 01:00:12 +00001439class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001440
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001441 def test_constructor(self):
1442 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001443 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001444
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001445 def test_uninitialized(self):
1446 pair = self.tp.__new__(self.tp)
1447 del pair
1448 pair = self.tp.__new__(self.tp)
1449 self.assertRaisesRegex((ValueError, AttributeError),
1450 'uninitialized|has no attribute',
1451 pair.read, 0)
1452 self.assertRaisesRegex((ValueError, AttributeError),
1453 'uninitialized|has no attribute',
1454 pair.write, b'')
1455 pair.__init__(self.MockRawIO(), self.MockRawIO())
1456 self.assertEqual(pair.read(0), b'')
1457 self.assertEqual(pair.write(b''), 0)
1458
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001459 def test_detach(self):
1460 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1461 self.assertRaises(self.UnsupportedOperation, pair.detach)
1462
Florent Xicluna109d5732012-07-07 17:03:22 +02001463 def test_constructor_max_buffer_size_removal(self):
1464 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001465 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001466
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001467 def test_constructor_with_not_readable(self):
1468 class NotReadable(MockRawIO):
1469 def readable(self):
1470 return False
1471
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001472 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001473
1474 def test_constructor_with_not_writeable(self):
1475 class NotWriteable(MockRawIO):
1476 def writable(self):
1477 return False
1478
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001479 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001480
1481 def test_read(self):
1482 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1483
1484 self.assertEqual(pair.read(3), b"abc")
1485 self.assertEqual(pair.read(1), b"d")
1486 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001487 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1488 self.assertEqual(pair.read(None), b"abc")
1489
1490 def test_readlines(self):
1491 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1492 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1493 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1494 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001495
1496 def test_read1(self):
1497 # .read1() is delegated to the underlying reader object, so this test
1498 # can be shallow.
1499 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1500
1501 self.assertEqual(pair.read1(3), b"abc")
1502
1503 def test_readinto(self):
1504 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1505
1506 data = bytearray(5)
1507 self.assertEqual(pair.readinto(data), 5)
1508 self.assertEqual(data, b"abcde")
1509
1510 def test_write(self):
1511 w = self.MockRawIO()
1512 pair = self.tp(self.MockRawIO(), w)
1513
1514 pair.write(b"abc")
1515 pair.flush()
1516 pair.write(b"def")
1517 pair.flush()
1518 self.assertEqual(w._write_stack, [b"abc", b"def"])
1519
1520 def test_peek(self):
1521 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1522
1523 self.assertTrue(pair.peek(3).startswith(b"abc"))
1524 self.assertEqual(pair.read(3), b"abc")
1525
1526 def test_readable(self):
1527 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1528 self.assertTrue(pair.readable())
1529
1530 def test_writeable(self):
1531 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1532 self.assertTrue(pair.writable())
1533
1534 def test_seekable(self):
1535 # BufferedRWPairs are never seekable, even if their readers and writers
1536 # are.
1537 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1538 self.assertFalse(pair.seekable())
1539
1540 # .flush() is delegated to the underlying writer object and has been
1541 # tested in the test_write method.
1542
1543 def test_close_and_closed(self):
1544 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1545 self.assertFalse(pair.closed)
1546 pair.close()
1547 self.assertTrue(pair.closed)
1548
1549 def test_isatty(self):
1550 class SelectableIsAtty(MockRawIO):
1551 def __init__(self, isatty):
1552 MockRawIO.__init__(self)
1553 self._isatty = isatty
1554
1555 def isatty(self):
1556 return self._isatty
1557
1558 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1559 self.assertFalse(pair.isatty())
1560
1561 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1562 self.assertTrue(pair.isatty())
1563
1564 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1565 self.assertTrue(pair.isatty())
1566
1567 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1568 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001569
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04001570 def test_weakref_clearing(self):
1571 brw = self.tp(self.MockRawIO(), self.MockRawIO())
1572 ref = weakref.ref(brw)
1573 brw = None
1574 ref = None # Shouldn't segfault.
1575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576class CBufferedRWPairTest(BufferedRWPairTest):
1577 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001579class PyBufferedRWPairTest(BufferedRWPairTest):
1580 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582
1583class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1584 read_mode = "rb+"
1585 write_mode = "wb+"
1586
1587 def test_constructor(self):
1588 BufferedReaderTest.test_constructor(self)
1589 BufferedWriterTest.test_constructor(self)
1590
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001591 def test_uninitialized(self):
1592 BufferedReaderTest.test_uninitialized(self)
1593 BufferedWriterTest.test_uninitialized(self)
1594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def test_read_and_write(self):
1596 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001597 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001598
1599 self.assertEqual(b"as", rw.read(2))
1600 rw.write(b"ddd")
1601 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001602 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001604 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001605
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 def test_seek_and_tell(self):
1607 raw = self.BytesIO(b"asdfghjkl")
1608 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001609
Ezio Melottib3aedd42010-11-20 19:04:17 +00001610 self.assertEqual(b"as", rw.read(2))
1611 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001612 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001613 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001614
Antoine Pitroue05565e2011-08-20 14:39:23 +02001615 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001616 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001617 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001618 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001619 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001620 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001621 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001622 self.assertEqual(7, rw.tell())
1623 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001624 rw.flush()
1625 self.assertEqual(b"asdf123fl", raw.getvalue())
1626
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001627 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 def check_flush_and_read(self, read_func):
1630 raw = self.BytesIO(b"abcdefghi")
1631 bufio = self.tp(raw)
1632
Ezio Melottib3aedd42010-11-20 19:04:17 +00001633 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001635 self.assertEqual(b"ef", read_func(bufio, 2))
1636 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001638 self.assertEqual(6, bufio.tell())
1639 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 raw.seek(0, 0)
1641 raw.write(b"XYZ")
1642 # flush() resets the read buffer
1643 bufio.flush()
1644 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001645 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646
1647 def test_flush_and_read(self):
1648 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1649
1650 def test_flush_and_readinto(self):
1651 def _readinto(bufio, n=-1):
1652 b = bytearray(n if n >= 0 else 9999)
1653 n = bufio.readinto(b)
1654 return bytes(b[:n])
1655 self.check_flush_and_read(_readinto)
1656
1657 def test_flush_and_peek(self):
1658 def _peek(bufio, n=-1):
1659 # This relies on the fact that the buffer can contain the whole
1660 # raw stream, otherwise peek() can return less.
1661 b = bufio.peek(n)
1662 if n != -1:
1663 b = b[:n]
1664 bufio.seek(len(b), 1)
1665 return b
1666 self.check_flush_and_read(_peek)
1667
1668 def test_flush_and_write(self):
1669 raw = self.BytesIO(b"abcdefghi")
1670 bufio = self.tp(raw)
1671
1672 bufio.write(b"123")
1673 bufio.flush()
1674 bufio.write(b"45")
1675 bufio.flush()
1676 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001677 self.assertEqual(b"12345fghi", raw.getvalue())
1678 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001679
1680 def test_threads(self):
1681 BufferedReaderTest.test_threads(self)
1682 BufferedWriterTest.test_threads(self)
1683
1684 def test_writes_and_peek(self):
1685 def _peek(bufio):
1686 bufio.peek(1)
1687 self.check_writes(_peek)
1688 def _peek(bufio):
1689 pos = bufio.tell()
1690 bufio.seek(-1, 1)
1691 bufio.peek(1)
1692 bufio.seek(pos, 0)
1693 self.check_writes(_peek)
1694
1695 def test_writes_and_reads(self):
1696 def _read(bufio):
1697 bufio.seek(-1, 1)
1698 bufio.read(1)
1699 self.check_writes(_read)
1700
1701 def test_writes_and_read1s(self):
1702 def _read1(bufio):
1703 bufio.seek(-1, 1)
1704 bufio.read1(1)
1705 self.check_writes(_read1)
1706
1707 def test_writes_and_readintos(self):
1708 def _read(bufio):
1709 bufio.seek(-1, 1)
1710 bufio.readinto(bytearray(1))
1711 self.check_writes(_read)
1712
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001713 def test_write_after_readahead(self):
1714 # Issue #6629: writing after the buffer was filled by readahead should
1715 # first rewind the raw stream.
1716 for overwrite_size in [1, 5]:
1717 raw = self.BytesIO(b"A" * 10)
1718 bufio = self.tp(raw, 4)
1719 # Trigger readahead
1720 self.assertEqual(bufio.read(1), b"A")
1721 self.assertEqual(bufio.tell(), 1)
1722 # Overwriting should rewind the raw stream if it needs so
1723 bufio.write(b"B" * overwrite_size)
1724 self.assertEqual(bufio.tell(), overwrite_size + 1)
1725 # If the write size was smaller than the buffer size, flush() and
1726 # check that rewind happens.
1727 bufio.flush()
1728 self.assertEqual(bufio.tell(), overwrite_size + 1)
1729 s = raw.getvalue()
1730 self.assertEqual(s,
1731 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1732
Antoine Pitrou7c404892011-05-13 00:13:33 +02001733 def test_write_rewind_write(self):
1734 # Various combinations of reading / writing / seeking backwards / writing again
1735 def mutate(bufio, pos1, pos2):
1736 assert pos2 >= pos1
1737 # Fill the buffer
1738 bufio.seek(pos1)
1739 bufio.read(pos2 - pos1)
1740 bufio.write(b'\x02')
1741 # This writes earlier than the previous write, but still inside
1742 # the buffer.
1743 bufio.seek(pos1)
1744 bufio.write(b'\x01')
1745
1746 b = b"\x80\x81\x82\x83\x84"
1747 for i in range(0, len(b)):
1748 for j in range(i, len(b)):
1749 raw = self.BytesIO(b)
1750 bufio = self.tp(raw, 100)
1751 mutate(bufio, i, j)
1752 bufio.flush()
1753 expected = bytearray(b)
1754 expected[j] = 2
1755 expected[i] = 1
1756 self.assertEqual(raw.getvalue(), expected,
1757 "failed result for i=%d, j=%d" % (i, j))
1758
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001759 def test_truncate_after_read_or_write(self):
1760 raw = self.BytesIO(b"A" * 10)
1761 bufio = self.tp(raw, 100)
1762 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1763 self.assertEqual(bufio.truncate(), 2)
1764 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1765 self.assertEqual(bufio.truncate(), 4)
1766
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 def test_misbehaved_io(self):
1768 BufferedReaderTest.test_misbehaved_io(self)
1769 BufferedWriterTest.test_misbehaved_io(self)
1770
Antoine Pitroue05565e2011-08-20 14:39:23 +02001771 def test_interleaved_read_write(self):
1772 # Test for issue #12213
1773 with self.BytesIO(b'abcdefgh') as raw:
1774 with self.tp(raw, 100) as f:
1775 f.write(b"1")
1776 self.assertEqual(f.read(1), b'b')
1777 f.write(b'2')
1778 self.assertEqual(f.read1(1), b'd')
1779 f.write(b'3')
1780 buf = bytearray(1)
1781 f.readinto(buf)
1782 self.assertEqual(buf, b'f')
1783 f.write(b'4')
1784 self.assertEqual(f.peek(1), b'h')
1785 f.flush()
1786 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1787
1788 with self.BytesIO(b'abc') as raw:
1789 with self.tp(raw, 100) as f:
1790 self.assertEqual(f.read(1), b'a')
1791 f.write(b"2")
1792 self.assertEqual(f.read(1), b'c')
1793 f.flush()
1794 self.assertEqual(raw.getvalue(), b'a2c')
1795
1796 def test_interleaved_readline_write(self):
1797 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1798 with self.tp(raw) as f:
1799 f.write(b'1')
1800 self.assertEqual(f.readline(), b'b\n')
1801 f.write(b'2')
1802 self.assertEqual(f.readline(), b'def\n')
1803 f.write(b'3')
1804 self.assertEqual(f.readline(), b'\n')
1805 f.flush()
1806 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1807
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001808 # You can't construct a BufferedRandom over a non-seekable stream.
1809 test_unseekable = None
1810
R David Murray67bfe802013-02-23 21:51:05 -05001811
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001812class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001813 tp = io.BufferedRandom
1814
1815 def test_constructor(self):
1816 BufferedRandomTest.test_constructor(self)
1817 # The allocation can succeed on 32-bit builds, e.g. with more
1818 # than 2GB RAM and a 64-bit kernel.
1819 if sys.maxsize > 0x7FFFFFFF:
1820 rawio = self.MockRawIO()
1821 bufio = self.tp(rawio)
1822 self.assertRaises((OverflowError, MemoryError, ValueError),
1823 bufio.__init__, rawio, sys.maxsize)
1824
1825 def test_garbage_collection(self):
1826 CBufferedReaderTest.test_garbage_collection(self)
1827 CBufferedWriterTest.test_garbage_collection(self)
1828
R David Murray67bfe802013-02-23 21:51:05 -05001829 def test_args_error(self):
1830 # Issue #17275
1831 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1832 self.tp(io.BytesIO(), 1024, 1024, 1024)
1833
1834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835class PyBufferedRandomTest(BufferedRandomTest):
1836 tp = pyio.BufferedRandom
1837
1838
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001839# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1840# properties:
1841# - A single output character can correspond to many bytes of input.
1842# - The number of input bytes to complete the character can be
1843# undetermined until the last input byte is received.
1844# - The number of input bytes can vary depending on previous input.
1845# - A single input byte can correspond to many characters of output.
1846# - The number of output characters can be undetermined until the
1847# last input byte is received.
1848# - The number of output characters can vary depending on previous input.
1849
1850class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1851 """
1852 For testing seek/tell behavior with a stateful, buffering decoder.
1853
1854 Input is a sequence of words. Words may be fixed-length (length set
1855 by input) or variable-length (period-terminated). In variable-length
1856 mode, extra periods are ignored. Possible words are:
1857 - 'i' followed by a number sets the input length, I (maximum 99).
1858 When I is set to 0, words are space-terminated.
1859 - 'o' followed by a number sets the output length, O (maximum 99).
1860 - Any other word is converted into a word followed by a period on
1861 the output. The output word consists of the input word truncated
1862 or padded out with hyphens to make its length equal to O. If O
1863 is 0, the word is output verbatim without truncating or padding.
1864 I and O are initially set to 1. When I changes, any buffered input is
1865 re-scanned according to the new I. EOF also terminates the last word.
1866 """
1867
1868 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001869 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001870 self.reset()
1871
1872 def __repr__(self):
1873 return '<SID %x>' % id(self)
1874
1875 def reset(self):
1876 self.i = 1
1877 self.o = 1
1878 self.buffer = bytearray()
1879
1880 def getstate(self):
1881 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1882 return bytes(self.buffer), i*100 + o
1883
1884 def setstate(self, state):
1885 buffer, io = state
1886 self.buffer = bytearray(buffer)
1887 i, o = divmod(io, 100)
1888 self.i, self.o = i ^ 1, o ^ 1
1889
1890 def decode(self, input, final=False):
1891 output = ''
1892 for b in input:
1893 if self.i == 0: # variable-length, terminated with period
1894 if b == ord('.'):
1895 if self.buffer:
1896 output += self.process_word()
1897 else:
1898 self.buffer.append(b)
1899 else: # fixed-length, terminate after self.i bytes
1900 self.buffer.append(b)
1901 if len(self.buffer) == self.i:
1902 output += self.process_word()
1903 if final and self.buffer: # EOF terminates the last word
1904 output += self.process_word()
1905 return output
1906
1907 def process_word(self):
1908 output = ''
1909 if self.buffer[0] == ord('i'):
1910 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1911 elif self.buffer[0] == ord('o'):
1912 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1913 else:
1914 output = self.buffer.decode('ascii')
1915 if len(output) < self.o:
1916 output += '-'*self.o # pad out with hyphens
1917 if self.o:
1918 output = output[:self.o] # truncate to output length
1919 output += '.'
1920 self.buffer = bytearray()
1921 return output
1922
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001923 codecEnabled = False
1924
1925 @classmethod
1926 def lookupTestDecoder(cls, name):
1927 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001928 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001929 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001930 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001931 incrementalencoder=None,
1932 streamreader=None, streamwriter=None,
1933 incrementaldecoder=cls)
1934
1935# Register the previous decoder for testing.
1936# Disabled by default, tests will enable it.
1937codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1938
1939
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001940class StatefulIncrementalDecoderTest(unittest.TestCase):
1941 """
1942 Make sure the StatefulIncrementalDecoder actually works.
1943 """
1944
1945 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001946 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001947 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001948 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001949 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001950 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001951 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001952 # I=0, O=6 (variable-length input, fixed-length output)
1953 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1954 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001955 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001956 # I=6, O=3 (fixed-length input > fixed-length output)
1957 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1958 # I=0, then 3; O=29, then 15 (with longer output)
1959 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1960 'a----------------------------.' +
1961 'b----------------------------.' +
1962 'cde--------------------------.' +
1963 'abcdefghijabcde.' +
1964 'a.b------------.' +
1965 '.c.------------.' +
1966 'd.e------------.' +
1967 'k--------------.' +
1968 'l--------------.' +
1969 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001970 ]
1971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001972 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001973 # Try a few one-shot test cases.
1974 for input, eof, output in self.test_cases:
1975 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001976 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001977
1978 # Also test an unfinished decode, followed by forcing EOF.
1979 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001980 self.assertEqual(d.decode(b'oiabcd'), '')
1981 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001982
1983class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001984
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001985 def setUp(self):
1986 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1987 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001988 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001989
Guido van Rossumd0712812007-04-11 16:32:43 +00001990 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001991 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 def test_constructor(self):
1994 r = self.BytesIO(b"\xc3\xa9\n\n")
1995 b = self.BufferedReader(r, 1000)
1996 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001997 t.__init__(b, encoding="latin-1", newline="\r\n")
1998 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001999 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002000 t.__init__(b, encoding="utf-8", line_buffering=True)
2001 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002002 self.assertEqual(t.line_buffering, True)
2003 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 self.assertRaises(TypeError, t.__init__, b, newline=42)
2005 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2006
Nick Coghlana9b15242014-02-04 22:11:18 +10002007 def test_non_text_encoding_codecs_are_rejected(self):
2008 # Ensure the constructor complains if passed a codec that isn't
2009 # marked as a text encoding
2010 # http://bugs.python.org/issue20404
2011 r = self.BytesIO()
2012 b = self.BufferedWriter(r)
2013 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2014 self.TextIOWrapper(b, encoding="hex")
2015
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002016 def test_detach(self):
2017 r = self.BytesIO()
2018 b = self.BufferedWriter(r)
2019 t = self.TextIOWrapper(b)
2020 self.assertIs(t.detach(), b)
2021
2022 t = self.TextIOWrapper(b, encoding="ascii")
2023 t.write("howdy")
2024 self.assertFalse(r.getvalue())
2025 t.detach()
2026 self.assertEqual(r.getvalue(), b"howdy")
2027 self.assertRaises(ValueError, t.detach)
2028
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002029 def test_repr(self):
2030 raw = self.BytesIO("hello".encode("utf-8"))
2031 b = self.BufferedReader(raw)
2032 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002033 modname = self.TextIOWrapper.__module__
2034 self.assertEqual(repr(t),
2035 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2036 raw.name = "dummy"
2037 self.assertEqual(repr(t),
2038 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002039 t.mode = "r"
2040 self.assertEqual(repr(t),
2041 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002042 raw.name = b"dummy"
2043 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002044 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 def test_line_buffering(self):
2047 r = self.BytesIO()
2048 b = self.BufferedWriter(r, 1000)
2049 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002050 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002051 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002052 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002053 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002054 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002055 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002056
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002057 def test_default_encoding(self):
2058 old_environ = dict(os.environ)
2059 try:
2060 # try to get a user preferred encoding different than the current
2061 # locale encoding to check that TextIOWrapper() uses the current
2062 # locale encoding and not the user preferred encoding
2063 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2064 if key in os.environ:
2065 del os.environ[key]
2066
2067 current_locale_encoding = locale.getpreferredencoding(False)
2068 b = self.BytesIO()
2069 t = self.TextIOWrapper(b)
2070 self.assertEqual(t.encoding, current_locale_encoding)
2071 finally:
2072 os.environ.clear()
2073 os.environ.update(old_environ)
2074
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002075 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002076 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002077 # Issue 15989
2078 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002079 b = self.BytesIO()
2080 b.fileno = lambda: _testcapi.INT_MAX + 1
2081 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2082 b.fileno = lambda: _testcapi.UINT_MAX + 1
2083 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 def test_encoding(self):
2086 # Check the encoding attribute is always set, and valid
2087 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002088 t = self.TextIOWrapper(b, encoding="utf-8")
2089 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002091 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002092 codecs.lookup(t.encoding)
2093
2094 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002095 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002096 b = self.BytesIO(b"abc\n\xff\n")
2097 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002098 self.assertRaises(UnicodeError, t.read)
2099 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100 b = self.BytesIO(b"abc\n\xff\n")
2101 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002102 self.assertRaises(UnicodeError, t.read)
2103 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002104 b = self.BytesIO(b"abc\n\xff\n")
2105 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002107 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 b = self.BytesIO(b"abc\n\xff\n")
2109 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002110 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002113 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114 b = self.BytesIO()
2115 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002116 self.assertRaises(UnicodeError, t.write, "\xff")
2117 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 b = self.BytesIO()
2119 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002120 self.assertRaises(UnicodeError, t.write, "\xff")
2121 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 b = self.BytesIO()
2123 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002124 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002125 t.write("abc\xffdef\n")
2126 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002127 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002128 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 b = self.BytesIO()
2130 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002131 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002132 t.write("abc\xffdef\n")
2133 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002137 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2138
2139 tests = [
2140 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002141 [ '', input_lines ],
2142 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2143 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2144 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002145 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002146 encodings = (
2147 'utf-8', 'latin-1',
2148 'utf-16', 'utf-16-le', 'utf-16-be',
2149 'utf-32', 'utf-32-le', 'utf-32-be',
2150 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002151
Guido van Rossum8358db22007-08-18 21:39:55 +00002152 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002153 # character in TextIOWrapper._pending_line.
2154 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002155 # XXX: str.encode() should return bytes
2156 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002157 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002158 for bufsize in range(1, 10):
2159 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2161 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002162 encoding=encoding)
2163 if do_reads:
2164 got_lines = []
2165 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002166 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002167 if c2 == '':
2168 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002170 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002171 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002172 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002173
2174 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002175 self.assertEqual(got_line, exp_line)
2176 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 def test_newlines_input(self):
2179 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002180 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2181 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002182 (None, normalized.decode("ascii").splitlines(keepends=True)),
2183 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2185 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2186 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002187 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 buf = self.BytesIO(testdata)
2189 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002191 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 def test_newlines_output(self):
2195 testdict = {
2196 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2197 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2198 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2199 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2200 }
2201 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2202 for newline, expected in tests:
2203 buf = self.BytesIO()
2204 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2205 txt.write("AAA\nB")
2206 txt.write("BB\nCCC\n")
2207 txt.write("X\rY\r\nZ")
2208 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(buf.closed, False)
2210 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211
2212 def test_destructor(self):
2213 l = []
2214 base = self.BytesIO
2215 class MyBytesIO(base):
2216 def close(self):
2217 l.append(self.getvalue())
2218 base.close(self)
2219 b = MyBytesIO()
2220 t = self.TextIOWrapper(b, encoding="ascii")
2221 t.write("abc")
2222 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002223 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225
2226 def test_override_destructor(self):
2227 record = []
2228 class MyTextIO(self.TextIOWrapper):
2229 def __del__(self):
2230 record.append(1)
2231 try:
2232 f = super().__del__
2233 except AttributeError:
2234 pass
2235 else:
2236 f()
2237 def close(self):
2238 record.append(2)
2239 super().close()
2240 def flush(self):
2241 record.append(3)
2242 super().flush()
2243 b = self.BytesIO()
2244 t = MyTextIO(b, encoding="ascii")
2245 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002246 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002247 self.assertEqual(record, [1, 2, 3])
2248
2249 def test_error_through_destructor(self):
2250 # Test that the exception state is not modified by a destructor,
2251 # even if close() fails.
2252 rawio = self.CloseFailureIO()
2253 def f():
2254 self.TextIOWrapper(rawio).xyzzy
2255 with support.captured_output("stderr") as s:
2256 self.assertRaises(AttributeError, f)
2257 s = s.getvalue().strip()
2258 if s:
2259 # The destructor *may* have printed an unraisable error, check it
2260 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002261 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002262 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002263
Guido van Rossum9b76da62007-04-11 01:09:03 +00002264 # Systematic tests of the text I/O API
2265
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002266 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002267 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002268 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002270 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002271 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002272 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002274 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(f.tell(), 0)
2276 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002277 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002278 self.assertEqual(f.seek(0), 0)
2279 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002280 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002281 self.assertEqual(f.read(2), "ab")
2282 self.assertEqual(f.read(1), "c")
2283 self.assertEqual(f.read(1), "")
2284 self.assertEqual(f.read(), "")
2285 self.assertEqual(f.tell(), cookie)
2286 self.assertEqual(f.seek(0), 0)
2287 self.assertEqual(f.seek(0, 2), cookie)
2288 self.assertEqual(f.write("def"), 3)
2289 self.assertEqual(f.seek(cookie), cookie)
2290 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002291 if enc.startswith("utf"):
2292 self.multi_line_test(f, enc)
2293 f.close()
2294
2295 def multi_line_test(self, f, enc):
2296 f.seek(0)
2297 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002298 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002299 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002300 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002301 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002302 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002303 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002304 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002305 wlines.append((f.tell(), line))
2306 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002307 f.seek(0)
2308 rlines = []
2309 while True:
2310 pos = f.tell()
2311 line = f.readline()
2312 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002313 break
2314 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002315 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002316
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002317 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002318 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002319 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002320 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002321 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002322 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002323 p2 = f.tell()
2324 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002325 self.assertEqual(f.tell(), p0)
2326 self.assertEqual(f.readline(), "\xff\n")
2327 self.assertEqual(f.tell(), p1)
2328 self.assertEqual(f.readline(), "\xff\n")
2329 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002330 f.seek(0)
2331 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002332 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002333 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002335 f.close()
2336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337 def test_seeking(self):
2338 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002339 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002340 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002341 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002342 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002343 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002344 suffix = bytes(u_suffix.encode("utf-8"))
2345 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002346 with self.open(support.TESTFN, "wb") as f:
2347 f.write(line*2)
2348 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2349 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(s, str(prefix, "ascii"))
2351 self.assertEqual(f.tell(), prefix_size)
2352 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002355 # Regression test for a specific bug
2356 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002357 with self.open(support.TESTFN, "wb") as f:
2358 f.write(data)
2359 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2360 f._CHUNK_SIZE # Just test that it exists
2361 f._CHUNK_SIZE = 2
2362 f.readline()
2363 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_seek_and_tell(self):
2366 #Test seek/tell using the StatefulIncrementalDecoder.
2367 # Make test faster by doing smaller seeks
2368 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002369
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002370 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002371 """Tell/seek to various points within a data stream and ensure
2372 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002373 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002374 f.write(data)
2375 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 f = self.open(support.TESTFN, encoding='test_decoder')
2377 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002378 decoded = f.read()
2379 f.close()
2380
Neal Norwitze2b07052008-03-18 19:52:05 +00002381 for i in range(min_pos, len(decoded) + 1): # seek positions
2382 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002385 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002386 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002387 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002388 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002389 f.close()
2390
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002391 # Enable the test decoder.
2392 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002393
2394 # Run the tests.
2395 try:
2396 # Try each test case.
2397 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002398 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002399
2400 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002401 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2402 offset = CHUNK_SIZE - len(input)//2
2403 prefix = b'.'*offset
2404 # Don't bother seeking into the prefix (takes too long).
2405 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002406 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002407
2408 # Ensure our test decoder won't interfere with subsequent tests.
2409 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002410 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002413 data = "1234567890"
2414 tests = ("utf-16",
2415 "utf-16-le",
2416 "utf-16-be",
2417 "utf-32",
2418 "utf-32-le",
2419 "utf-32-be")
2420 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002421 buf = self.BytesIO()
2422 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002423 # Check if the BOM is written only once (see issue1753).
2424 f.write(data)
2425 f.write(data)
2426 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002427 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002428 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002429 self.assertEqual(f.read(), data * 2)
2430 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002431
Benjamin Petersona1b49012009-03-31 23:11:32 +00002432 def test_unreadable(self):
2433 class UnReadable(self.BytesIO):
2434 def readable(self):
2435 return False
2436 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002437 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439 def test_read_one_by_one(self):
2440 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002441 reads = ""
2442 while True:
2443 c = txt.read(1)
2444 if not c:
2445 break
2446 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002448
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002449 def test_readlines(self):
2450 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2451 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2452 txt.seek(0)
2453 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2454 txt.seek(0)
2455 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2456
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002457 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002458 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002459 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002461 reads = ""
2462 while True:
2463 c = txt.read(128)
2464 if not c:
2465 break
2466 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002468
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002469 def test_writelines(self):
2470 l = ['ab', 'cd', 'ef']
2471 buf = self.BytesIO()
2472 txt = self.TextIOWrapper(buf)
2473 txt.writelines(l)
2474 txt.flush()
2475 self.assertEqual(buf.getvalue(), b'abcdef')
2476
2477 def test_writelines_userlist(self):
2478 l = UserList(['ab', 'cd', 'ef'])
2479 buf = self.BytesIO()
2480 txt = self.TextIOWrapper(buf)
2481 txt.writelines(l)
2482 txt.flush()
2483 self.assertEqual(buf.getvalue(), b'abcdef')
2484
2485 def test_writelines_error(self):
2486 txt = self.TextIOWrapper(self.BytesIO())
2487 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2488 self.assertRaises(TypeError, txt.writelines, None)
2489 self.assertRaises(TypeError, txt.writelines, b'abc')
2490
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002491 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002493
2494 # read one char at a time
2495 reads = ""
2496 while True:
2497 c = txt.read(1)
2498 if not c:
2499 break
2500 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002502
2503 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002505 txt._CHUNK_SIZE = 4
2506
2507 reads = ""
2508 while True:
2509 c = txt.read(4)
2510 if not c:
2511 break
2512 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002513 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002514
2515 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002516 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002517 txt._CHUNK_SIZE = 4
2518
2519 reads = txt.read(4)
2520 reads += txt.read(4)
2521 reads += txt.readline()
2522 reads += txt.readline()
2523 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002524 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002525
2526 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002527 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002528 txt._CHUNK_SIZE = 4
2529
2530 reads = txt.read(4)
2531 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002532 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002533
2534 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002535 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002536 txt._CHUNK_SIZE = 4
2537
2538 reads = txt.read(4)
2539 pos = txt.tell()
2540 txt.seek(0)
2541 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002542 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002543
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002544 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 buffer = self.BytesIO(self.testdata)
2546 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002547
2548 self.assertEqual(buffer.seekable(), txt.seekable())
2549
Antoine Pitroue4501852009-05-14 18:55:55 +00002550 def test_append_bom(self):
2551 # The BOM is not written again when appending to a non-empty file
2552 filename = support.TESTFN
2553 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2554 with self.open(filename, 'w', encoding=charset) as f:
2555 f.write('aaa')
2556 pos = f.tell()
2557 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002558 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002559
2560 with self.open(filename, 'a', encoding=charset) as f:
2561 f.write('xxx')
2562 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002563 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002564
2565 def test_seek_bom(self):
2566 # Same test, but when seeking manually
2567 filename = support.TESTFN
2568 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2569 with self.open(filename, 'w', encoding=charset) as f:
2570 f.write('aaa')
2571 pos = f.tell()
2572 with self.open(filename, 'r+', encoding=charset) as f:
2573 f.seek(pos)
2574 f.write('zzz')
2575 f.seek(0)
2576 f.write('bbb')
2577 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002578 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002579
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002580 def test_errors_property(self):
2581 with self.open(support.TESTFN, "w") as f:
2582 self.assertEqual(f.errors, "strict")
2583 with self.open(support.TESTFN, "w", errors="replace") as f:
2584 self.assertEqual(f.errors, "replace")
2585
Brett Cannon31f59292011-02-21 19:29:56 +00002586 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002587 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002588 def test_threads_write(self):
2589 # Issue6750: concurrent writes could duplicate data
2590 event = threading.Event()
2591 with self.open(support.TESTFN, "w", buffering=1) as f:
2592 def run(n):
2593 text = "Thread%03d\n" % n
2594 event.wait()
2595 f.write(text)
2596 threads = [threading.Thread(target=lambda n=x: run(n))
2597 for x in range(20)]
2598 for t in threads:
2599 t.start()
2600 time.sleep(0.02)
2601 event.set()
2602 for t in threads:
2603 t.join()
2604 with self.open(support.TESTFN) as f:
2605 content = f.read()
2606 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002607 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002608
Antoine Pitrou6be88762010-05-03 16:48:20 +00002609 def test_flush_error_on_close(self):
2610 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2611 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002612 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002613 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002614 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002615 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002616
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002617 def test_close_error_on_close(self):
2618 buffer = self.BytesIO(self.testdata)
2619 def bad_flush():
2620 raise OSError('flush')
2621 def bad_close():
2622 raise OSError('close')
2623 buffer.close = bad_close
2624 txt = self.TextIOWrapper(buffer, encoding="ascii")
2625 txt.flush = bad_flush
2626 with self.assertRaises(OSError) as err: # exception not swallowed
2627 txt.close()
2628 self.assertEqual(err.exception.args, ('close',))
2629 self.assertIsInstance(err.exception.__context__, OSError)
2630 self.assertEqual(err.exception.__context__.args, ('flush',))
2631 self.assertFalse(txt.closed)
2632
2633 def test_nonnormalized_close_error_on_close(self):
2634 # Issue #21677
2635 buffer = self.BytesIO(self.testdata)
2636 def bad_flush():
2637 raise non_existing_flush
2638 def bad_close():
2639 raise non_existing_close
2640 buffer.close = bad_close
2641 txt = self.TextIOWrapper(buffer, encoding="ascii")
2642 txt.flush = bad_flush
2643 with self.assertRaises(NameError) as err: # exception not swallowed
2644 txt.close()
2645 self.assertIn('non_existing_close', str(err.exception))
2646 self.assertIsInstance(err.exception.__context__, NameError)
2647 self.assertIn('non_existing_flush', str(err.exception.__context__))
2648 self.assertFalse(txt.closed)
2649
Antoine Pitrou6be88762010-05-03 16:48:20 +00002650 def test_multi_close(self):
2651 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2652 txt.close()
2653 txt.close()
2654 txt.close()
2655 self.assertRaises(ValueError, txt.flush)
2656
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002657 def test_unseekable(self):
2658 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2659 self.assertRaises(self.UnsupportedOperation, txt.tell)
2660 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2661
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002662 def test_readonly_attributes(self):
2663 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2664 buf = self.BytesIO(self.testdata)
2665 with self.assertRaises(AttributeError):
2666 txt.buffer = buf
2667
Antoine Pitroue96ec682011-07-23 21:46:35 +02002668 def test_rawio(self):
2669 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2670 # that subprocess.Popen() can have the required unbuffered
2671 # semantics with universal_newlines=True.
2672 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2673 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2674 # Reads
2675 self.assertEqual(txt.read(4), 'abcd')
2676 self.assertEqual(txt.readline(), 'efghi\n')
2677 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2678
2679 def test_rawio_write_through(self):
2680 # Issue #12591: with write_through=True, writes don't need a flush
2681 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2682 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2683 write_through=True)
2684 txt.write('1')
2685 txt.write('23\n4')
2686 txt.write('5')
2687 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2688
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002689 def test_bufio_write_through(self):
2690 # Issue #21396: write_through=True doesn't force a flush()
2691 # on the underlying binary buffered object.
2692 flush_called, write_called = [], []
2693 class BufferedWriter(self.BufferedWriter):
2694 def flush(self, *args, **kwargs):
2695 flush_called.append(True)
2696 return super().flush(*args, **kwargs)
2697 def write(self, *args, **kwargs):
2698 write_called.append(True)
2699 return super().write(*args, **kwargs)
2700
2701 rawio = self.BytesIO()
2702 data = b"a"
2703 bufio = BufferedWriter(rawio, len(data)*2)
2704 textio = self.TextIOWrapper(bufio, encoding='ascii',
2705 write_through=True)
2706 # write to the buffered io but don't overflow the buffer
2707 text = data.decode('ascii')
2708 textio.write(text)
2709
2710 # buffer.flush is not called with write_through=True
2711 self.assertFalse(flush_called)
2712 # buffer.write *is* called with write_through=True
2713 self.assertTrue(write_called)
2714 self.assertEqual(rawio.getvalue(), b"") # no flush
2715
2716 write_called = [] # reset
2717 textio.write(text * 10) # total content is larger than bufio buffer
2718 self.assertTrue(write_called)
2719 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2720
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002721 def test_read_nonbytes(self):
2722 # Issue #17106
2723 # Crash when underlying read() returns non-bytes
2724 t = self.TextIOWrapper(self.StringIO('a'))
2725 self.assertRaises(TypeError, t.read, 1)
2726 t = self.TextIOWrapper(self.StringIO('a'))
2727 self.assertRaises(TypeError, t.readline)
2728 t = self.TextIOWrapper(self.StringIO('a'))
2729 self.assertRaises(TypeError, t.read)
2730
2731 def test_illegal_decoder(self):
2732 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002733 # Bypass the early encoding check added in issue 20404
2734 def _make_illegal_wrapper():
2735 quopri = codecs.lookup("quopri")
2736 quopri._is_text_encoding = True
2737 try:
2738 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2739 newline='\n', encoding="quopri")
2740 finally:
2741 quopri._is_text_encoding = False
2742 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002743 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002744 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002745 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002746 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002747 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002748 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002749 self.assertRaises(TypeError, t.read)
2750
Antoine Pitrou712cb732013-12-21 15:51:54 +01002751 def _check_create_at_shutdown(self, **kwargs):
2752 # Issue #20037: creating a TextIOWrapper at shutdown
2753 # shouldn't crash the interpreter.
2754 iomod = self.io.__name__
2755 code = """if 1:
2756 import codecs
2757 import {iomod} as io
2758
2759 # Avoid looking up codecs at shutdown
2760 codecs.lookup('utf-8')
2761
2762 class C:
2763 def __init__(self):
2764 self.buf = io.BytesIO()
2765 def __del__(self):
2766 io.TextIOWrapper(self.buf, **{kwargs})
2767 print("ok")
2768 c = C()
2769 """.format(iomod=iomod, kwargs=kwargs)
2770 return assert_python_ok("-c", code)
2771
2772 def test_create_at_shutdown_without_encoding(self):
2773 rc, out, err = self._check_create_at_shutdown()
2774 if err:
2775 # Can error out with a RuntimeError if the module state
2776 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002777 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002778 else:
2779 self.assertEqual("ok", out.decode().strip())
2780
2781 def test_create_at_shutdown_with_encoding(self):
2782 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2783 errors='strict')
2784 self.assertFalse(err)
2785 self.assertEqual("ok", out.decode().strip())
2786
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002788class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002789 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002790 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002791
2792 def test_initialization(self):
2793 r = self.BytesIO(b"\xc3\xa9\n\n")
2794 b = self.BufferedReader(r, 1000)
2795 t = self.TextIOWrapper(b)
2796 self.assertRaises(TypeError, t.__init__, b, newline=42)
2797 self.assertRaises(ValueError, t.read)
2798 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2799 self.assertRaises(ValueError, t.read)
2800
2801 def test_garbage_collection(self):
2802 # C TextIOWrapper objects are collected, and collecting them flushes
2803 # all data to disk.
2804 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002805 with support.check_warnings(('', ResourceWarning)):
2806 rawio = io.FileIO(support.TESTFN, "wb")
2807 b = self.BufferedWriter(rawio)
2808 t = self.TextIOWrapper(b, encoding="ascii")
2809 t.write("456def")
2810 t.x = t
2811 wr = weakref.ref(t)
2812 del t
2813 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002814 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002815 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002816 self.assertEqual(f.read(), b"456def")
2817
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002818 def test_rwpair_cleared_before_textio(self):
2819 # Issue 13070: TextIOWrapper's finalization would crash when called
2820 # after the reference to the underlying BufferedRWPair's writer got
2821 # cleared by the GC.
2822 for i in range(1000):
2823 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2824 t1 = self.TextIOWrapper(b1, encoding="ascii")
2825 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2826 t2 = self.TextIOWrapper(b2, encoding="ascii")
2827 # circular references
2828 t1.buddy = t2
2829 t2.buddy = t1
2830 support.gc_collect()
2831
2832
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002833class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002834 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002835 #shutdown_error = "LookupError: unknown encoding: ascii"
2836 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002837
2838
2839class IncrementalNewlineDecoderTest(unittest.TestCase):
2840
2841 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002842 # UTF-8 specific tests for a newline decoder
2843 def _check_decode(b, s, **kwargs):
2844 # We exercise getstate() / setstate() as well as decode()
2845 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002846 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002847 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002848 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002849
Antoine Pitrou180a3362008-12-14 16:36:46 +00002850 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002851
Antoine Pitrou180a3362008-12-14 16:36:46 +00002852 _check_decode(b'\xe8', "")
2853 _check_decode(b'\xa2', "")
2854 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002855
Antoine Pitrou180a3362008-12-14 16:36:46 +00002856 _check_decode(b'\xe8', "")
2857 _check_decode(b'\xa2', "")
2858 _check_decode(b'\x88', "\u8888")
2859
2860 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002861 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2862
Antoine Pitrou180a3362008-12-14 16:36:46 +00002863 decoder.reset()
2864 _check_decode(b'\n', "\n")
2865 _check_decode(b'\r', "")
2866 _check_decode(b'', "\n", final=True)
2867 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002868
Antoine Pitrou180a3362008-12-14 16:36:46 +00002869 _check_decode(b'\r', "")
2870 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002871
Antoine Pitrou180a3362008-12-14 16:36:46 +00002872 _check_decode(b'\r\r\n', "\n\n")
2873 _check_decode(b'\r', "")
2874 _check_decode(b'\r', "\n")
2875 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002876
Antoine Pitrou180a3362008-12-14 16:36:46 +00002877 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2878 _check_decode(b'\xe8\xa2\x88', "\u8888")
2879 _check_decode(b'\n', "\n")
2880 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2881 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002883 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002884 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002885 if encoding is not None:
2886 encoder = codecs.getincrementalencoder(encoding)()
2887 def _decode_bytewise(s):
2888 # Decode one byte at a time
2889 for b in encoder.encode(s):
2890 result.append(decoder.decode(bytes([b])))
2891 else:
2892 encoder = None
2893 def _decode_bytewise(s):
2894 # Decode one char at a time
2895 for c in s:
2896 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002898 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002899 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002900 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002901 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002902 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002903 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002904 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002905 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002906 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002907 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002908 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002909 input = "abc"
2910 if encoder is not None:
2911 encoder.reset()
2912 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002913 self.assertEqual(decoder.decode(input), "abc")
2914 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002915
2916 def test_newline_decoder(self):
2917 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002918 # None meaning the IncrementalNewlineDecoder takes unicode input
2919 # rather than bytes input
2920 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002921 'utf-16', 'utf-16-le', 'utf-16-be',
2922 'utf-32', 'utf-32-le', 'utf-32-be',
2923 )
2924 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002925 decoder = enc and codecs.getincrementaldecoder(enc)()
2926 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2927 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002928 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002929 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2930 self.check_newline_decoding_utf8(decoder)
2931
Antoine Pitrou66913e22009-03-06 23:40:56 +00002932 def test_newline_bytes(self):
2933 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2934 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002935 self.assertEqual(dec.newlines, None)
2936 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2937 self.assertEqual(dec.newlines, None)
2938 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2939 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002940 dec = self.IncrementalNewlineDecoder(None, translate=False)
2941 _check(dec)
2942 dec = self.IncrementalNewlineDecoder(None, translate=True)
2943 _check(dec)
2944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002945class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2946 pass
2947
2948class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2949 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002950
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002951
Guido van Rossum01a27522007-03-07 01:00:12 +00002952# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002953
Guido van Rossum5abbf752007-08-27 17:39:33 +00002954class MiscIOTest(unittest.TestCase):
2955
Barry Warsaw40e82462008-11-20 20:14:50 +00002956 def tearDown(self):
2957 support.unlink(support.TESTFN)
2958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002959 def test___all__(self):
2960 for name in self.io.__all__:
2961 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002962 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002963 if name == "open":
2964 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002965 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002966 self.assertTrue(issubclass(obj, Exception), name)
2967 elif not name.startswith("SEEK_"):
2968 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002969
Barry Warsaw40e82462008-11-20 20:14:50 +00002970 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002971 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002972 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002973 f.close()
2974
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002975 with support.check_warnings(('', DeprecationWarning)):
2976 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002977 self.assertEqual(f.name, support.TESTFN)
2978 self.assertEqual(f.buffer.name, support.TESTFN)
2979 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2980 self.assertEqual(f.mode, "U")
2981 self.assertEqual(f.buffer.mode, "rb")
2982 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002983 f.close()
2984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002985 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(f.mode, "w+")
2987 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2988 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002990 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002991 self.assertEqual(g.mode, "wb")
2992 self.assertEqual(g.raw.mode, "wb")
2993 self.assertEqual(g.name, f.fileno())
2994 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002995 f.close()
2996 g.close()
2997
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002998 def test_io_after_close(self):
2999 for kwargs in [
3000 {"mode": "w"},
3001 {"mode": "wb"},
3002 {"mode": "w", "buffering": 1},
3003 {"mode": "w", "buffering": 2},
3004 {"mode": "wb", "buffering": 0},
3005 {"mode": "r"},
3006 {"mode": "rb"},
3007 {"mode": "r", "buffering": 1},
3008 {"mode": "r", "buffering": 2},
3009 {"mode": "rb", "buffering": 0},
3010 {"mode": "w+"},
3011 {"mode": "w+b"},
3012 {"mode": "w+", "buffering": 1},
3013 {"mode": "w+", "buffering": 2},
3014 {"mode": "w+b", "buffering": 0},
3015 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003016 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003017 f.close()
3018 self.assertRaises(ValueError, f.flush)
3019 self.assertRaises(ValueError, f.fileno)
3020 self.assertRaises(ValueError, f.isatty)
3021 self.assertRaises(ValueError, f.__iter__)
3022 if hasattr(f, "peek"):
3023 self.assertRaises(ValueError, f.peek, 1)
3024 self.assertRaises(ValueError, f.read)
3025 if hasattr(f, "read1"):
3026 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003027 if hasattr(f, "readall"):
3028 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003029 if hasattr(f, "readinto"):
3030 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3031 self.assertRaises(ValueError, f.readline)
3032 self.assertRaises(ValueError, f.readlines)
3033 self.assertRaises(ValueError, f.seek, 0)
3034 self.assertRaises(ValueError, f.tell)
3035 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003036 self.assertRaises(ValueError, f.write,
3037 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003038 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003039 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003041 def test_blockingioerror(self):
3042 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003043 class C(str):
3044 pass
3045 c = C("")
3046 b = self.BlockingIOError(1, c)
3047 c.b = b
3048 b.c = c
3049 wr = weakref.ref(c)
3050 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003051 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003052 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003053
3054 def test_abcs(self):
3055 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003056 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3057 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3058 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3059 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003060
3061 def _check_abc_inheritance(self, abcmodule):
3062 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003063 self.assertIsInstance(f, abcmodule.IOBase)
3064 self.assertIsInstance(f, abcmodule.RawIOBase)
3065 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3066 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003067 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003068 self.assertIsInstance(f, abcmodule.IOBase)
3069 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3070 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3071 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003072 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003073 self.assertIsInstance(f, abcmodule.IOBase)
3074 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3075 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3076 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003077
3078 def test_abc_inheritance(self):
3079 # Test implementations inherit from their respective ABCs
3080 self._check_abc_inheritance(self)
3081
3082 def test_abc_inheritance_official(self):
3083 # Test implementations inherit from the official ABCs of the
3084 # baseline "io" module.
3085 self._check_abc_inheritance(io)
3086
Antoine Pitroue033e062010-10-29 10:38:18 +00003087 def _check_warn_on_dealloc(self, *args, **kwargs):
3088 f = open(*args, **kwargs)
3089 r = repr(f)
3090 with self.assertWarns(ResourceWarning) as cm:
3091 f = None
3092 support.gc_collect()
3093 self.assertIn(r, str(cm.warning.args[0]))
3094
3095 def test_warn_on_dealloc(self):
3096 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3097 self._check_warn_on_dealloc(support.TESTFN, "wb")
3098 self._check_warn_on_dealloc(support.TESTFN, "w")
3099
3100 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3101 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003102 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003103 for fd in fds:
3104 try:
3105 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003106 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003107 if e.errno != errno.EBADF:
3108 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003109 self.addCleanup(cleanup_fds)
3110 r, w = os.pipe()
3111 fds += r, w
3112 self._check_warn_on_dealloc(r, *args, **kwargs)
3113 # When using closefd=False, there's no warning
3114 r, w = os.pipe()
3115 fds += r, w
3116 with warnings.catch_warnings(record=True) as recorded:
3117 open(r, *args, closefd=False, **kwargs)
3118 support.gc_collect()
3119 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003120
3121 def test_warn_on_dealloc_fd(self):
3122 self._check_warn_on_dealloc_fd("rb", buffering=0)
3123 self._check_warn_on_dealloc_fd("rb")
3124 self._check_warn_on_dealloc_fd("r")
3125
3126
Antoine Pitrou243757e2010-11-05 21:15:39 +00003127 def test_pickling(self):
3128 # Pickling file objects is forbidden
3129 for kwargs in [
3130 {"mode": "w"},
3131 {"mode": "wb"},
3132 {"mode": "wb", "buffering": 0},
3133 {"mode": "r"},
3134 {"mode": "rb"},
3135 {"mode": "rb", "buffering": 0},
3136 {"mode": "w+"},
3137 {"mode": "w+b"},
3138 {"mode": "w+b", "buffering": 0},
3139 ]:
3140 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3141 with self.open(support.TESTFN, **kwargs) as f:
3142 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3143
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003144 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3145 def test_nonblock_pipe_write_bigbuf(self):
3146 self._test_nonblock_pipe_write(16*1024)
3147
3148 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3149 def test_nonblock_pipe_write_smallbuf(self):
3150 self._test_nonblock_pipe_write(1024)
3151
3152 def _set_non_blocking(self, fd):
3153 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3154 self.assertNotEqual(flags, -1)
3155 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3156 self.assertEqual(res, 0)
3157
3158 def _test_nonblock_pipe_write(self, bufsize):
3159 sent = []
3160 received = []
3161 r, w = os.pipe()
3162 self._set_non_blocking(r)
3163 self._set_non_blocking(w)
3164
3165 # To exercise all code paths in the C implementation we need
3166 # to play with buffer sizes. For instance, if we choose a
3167 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3168 # then we will never get a partial write of the buffer.
3169 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3170 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3171
3172 with rf, wf:
3173 for N in 9999, 73, 7574:
3174 try:
3175 i = 0
3176 while True:
3177 msg = bytes([i % 26 + 97]) * N
3178 sent.append(msg)
3179 wf.write(msg)
3180 i += 1
3181
3182 except self.BlockingIOError as e:
3183 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003184 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003185 sent[-1] = sent[-1][:e.characters_written]
3186 received.append(rf.read())
3187 msg = b'BLOCKED'
3188 wf.write(msg)
3189 sent.append(msg)
3190
3191 while True:
3192 try:
3193 wf.flush()
3194 break
3195 except self.BlockingIOError as e:
3196 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003197 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003198 self.assertEqual(e.characters_written, 0)
3199 received.append(rf.read())
3200
3201 received += iter(rf.read, None)
3202
3203 sent, received = b''.join(sent), b''.join(received)
3204 self.assertTrue(sent == received)
3205 self.assertTrue(wf.closed)
3206 self.assertTrue(rf.closed)
3207
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003208 def test_create_fail(self):
3209 # 'x' mode fails if file is existing
3210 with self.open(support.TESTFN, 'w'):
3211 pass
3212 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3213
3214 def test_create_writes(self):
3215 # 'x' mode opens for writing
3216 with self.open(support.TESTFN, 'xb') as f:
3217 f.write(b"spam")
3218 with self.open(support.TESTFN, 'rb') as f:
3219 self.assertEqual(b"spam", f.read())
3220
Christian Heimes7b648752012-09-10 14:48:43 +02003221 def test_open_allargs(self):
3222 # there used to be a buffer overflow in the parser for rawmode
3223 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3224
3225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003226class CMiscIOTest(MiscIOTest):
3227 io = io
3228
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003229 def test_readinto_buffer_overflow(self):
3230 # Issue #18025
3231 class BadReader(self.io.BufferedIOBase):
3232 def read(self, n=-1):
3233 return b'x' * 10**6
3234 bufio = BadReader()
3235 b = bytearray(2)
3236 self.assertRaises(ValueError, bufio.readinto, b)
3237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003238class PyMiscIOTest(MiscIOTest):
3239 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003240
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003241
3242@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3243class SignalsTest(unittest.TestCase):
3244
3245 def setUp(self):
3246 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3247
3248 def tearDown(self):
3249 signal.signal(signal.SIGALRM, self.oldalrm)
3250
3251 def alarm_interrupt(self, sig, frame):
3252 1/0
3253
3254 @unittest.skipUnless(threading, 'Threading required for this test.')
3255 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3256 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003257 invokes the signal handler, and bubbles up the exception raised
3258 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003259 read_results = []
3260 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003261 if hasattr(signal, 'pthread_sigmask'):
3262 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003263 s = os.read(r, 1)
3264 read_results.append(s)
3265 t = threading.Thread(target=_read)
3266 t.daemon = True
3267 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003268 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003269 try:
3270 wio = self.io.open(w, **fdopen_kwargs)
3271 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003272 # Fill the pipe enough that the write will be blocking.
3273 # It will be interrupted by the timer armed above. Since the
3274 # other thread has read one byte, the low-level write will
3275 # return with a successful (partial) result rather than an EINTR.
3276 # The buffered IO layer must check for pending signal
3277 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003278 signal.alarm(1)
3279 try:
3280 self.assertRaises(ZeroDivisionError,
3281 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3282 finally:
3283 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003284 t.join()
3285 # We got one byte, get another one and check that it isn't a
3286 # repeat of the first one.
3287 read_results.append(os.read(r, 1))
3288 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3289 finally:
3290 os.close(w)
3291 os.close(r)
3292 # This is deliberate. If we didn't close the file descriptor
3293 # before closing wio, wio would try to flush its internal
3294 # buffer, and block again.
3295 try:
3296 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003297 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003298 if e.errno != errno.EBADF:
3299 raise
3300
3301 def test_interrupted_write_unbuffered(self):
3302 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3303
3304 def test_interrupted_write_buffered(self):
3305 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3306
Victor Stinner6ab72862014-09-03 23:32:28 +02003307 # Issue #22331: The test hangs on FreeBSD 7.2
3308 @support.requires_freebsd_version(8)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003309 def test_interrupted_write_text(self):
3310 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3311
Brett Cannon31f59292011-02-21 19:29:56 +00003312 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003313 def check_reentrant_write(self, data, **fdopen_kwargs):
3314 def on_alarm(*args):
3315 # Will be called reentrantly from the same thread
3316 wio.write(data)
3317 1/0
3318 signal.signal(signal.SIGALRM, on_alarm)
3319 r, w = os.pipe()
3320 wio = self.io.open(w, **fdopen_kwargs)
3321 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003322 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003323 # Either the reentrant call to wio.write() fails with RuntimeError,
3324 # or the signal handler raises ZeroDivisionError.
3325 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3326 while 1:
3327 for i in range(100):
3328 wio.write(data)
3329 wio.flush()
3330 # Make sure the buffer doesn't fill up and block further writes
3331 os.read(r, len(data) * 100)
3332 exc = cm.exception
3333 if isinstance(exc, RuntimeError):
3334 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3335 finally:
3336 wio.close()
3337 os.close(r)
3338
3339 def test_reentrant_write_buffered(self):
3340 self.check_reentrant_write(b"xy", mode="wb")
3341
3342 def test_reentrant_write_text(self):
3343 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3344
Antoine Pitrou707ce822011-02-25 21:24:11 +00003345 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3346 """Check that a buffered read, when it gets interrupted (either
3347 returning a partial result or EINTR), properly invokes the signal
3348 handler and retries if the latter returned successfully."""
3349 r, w = os.pipe()
3350 fdopen_kwargs["closefd"] = False
3351 def alarm_handler(sig, frame):
3352 os.write(w, b"bar")
3353 signal.signal(signal.SIGALRM, alarm_handler)
3354 try:
3355 rio = self.io.open(r, **fdopen_kwargs)
3356 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003357 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003358 # Expected behaviour:
3359 # - first raw read() returns partial b"foo"
3360 # - second raw read() returns EINTR
3361 # - third raw read() returns b"bar"
3362 self.assertEqual(decode(rio.read(6)), "foobar")
3363 finally:
3364 rio.close()
3365 os.close(w)
3366 os.close(r)
3367
Antoine Pitrou20db5112011-08-19 20:32:34 +02003368 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003369 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3370 mode="rb")
3371
Antoine Pitrou20db5112011-08-19 20:32:34 +02003372 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003373 self.check_interrupted_read_retry(lambda x: x,
3374 mode="r")
3375
3376 @unittest.skipUnless(threading, 'Threading required for this test.')
3377 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3378 """Check that a buffered write, when it gets interrupted (either
3379 returning a partial result or EINTR), properly invokes the signal
3380 handler and retries if the latter returned successfully."""
3381 select = support.import_module("select")
3382 # A quantity that exceeds the buffer size of an anonymous pipe's
3383 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003384 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003385 r, w = os.pipe()
3386 fdopen_kwargs["closefd"] = False
3387 # We need a separate thread to read from the pipe and allow the
3388 # write() to finish. This thread is started after the SIGALRM is
3389 # received (forcing a first EINTR in write()).
3390 read_results = []
3391 write_finished = False
3392 def _read():
3393 while not write_finished:
3394 while r in select.select([r], [], [], 1.0)[0]:
3395 s = os.read(r, 1024)
3396 read_results.append(s)
3397 t = threading.Thread(target=_read)
3398 t.daemon = True
3399 def alarm1(sig, frame):
3400 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003401 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003402 def alarm2(sig, frame):
3403 t.start()
3404 signal.signal(signal.SIGALRM, alarm1)
3405 try:
3406 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003407 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003408 # Expected behaviour:
3409 # - first raw write() is partial (because of the limited pipe buffer
3410 # and the first alarm)
3411 # - second raw write() returns EINTR (because of the second alarm)
3412 # - subsequent write()s are successful (either partial or complete)
3413 self.assertEqual(N, wio.write(item * N))
3414 wio.flush()
3415 write_finished = True
3416 t.join()
3417 self.assertEqual(N, sum(len(x) for x in read_results))
3418 finally:
3419 write_finished = True
3420 os.close(w)
3421 os.close(r)
3422 # This is deliberate. If we didn't close the file descriptor
3423 # before closing wio, wio would try to flush its internal
3424 # buffer, and could block (in case of failure).
3425 try:
3426 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003427 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003428 if e.errno != errno.EBADF:
3429 raise
3430
Antoine Pitrou20db5112011-08-19 20:32:34 +02003431 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003432 self.check_interrupted_write_retry(b"x", mode="wb")
3433
Antoine Pitrou20db5112011-08-19 20:32:34 +02003434 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003435 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3436
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003437
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003438class CSignalsTest(SignalsTest):
3439 io = io
3440
3441class PySignalsTest(SignalsTest):
3442 io = pyio
3443
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003444 # Handling reentrancy issues would slow down _pyio even more, so the
3445 # tests are disabled.
3446 test_reentrant_write_buffered = None
3447 test_reentrant_write_text = None
3448
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003449
Ezio Melottidaa42c72013-03-23 16:30:16 +02003450def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003451 tests = (CIOTest, PyIOTest,
3452 CBufferedReaderTest, PyBufferedReaderTest,
3453 CBufferedWriterTest, PyBufferedWriterTest,
3454 CBufferedRWPairTest, PyBufferedRWPairTest,
3455 CBufferedRandomTest, PyBufferedRandomTest,
3456 StatefulIncrementalDecoderTest,
3457 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3458 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003459 CMiscIOTest, PyMiscIOTest,
3460 CSignalsTest, PySignalsTest,
3461 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003462
3463 # Put the namespaces of the IO module we are testing and some useful mock
3464 # classes in the __dict__ of each test.
3465 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003466 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003467 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3468 c_io_ns = {name : getattr(io, name) for name in all_members}
3469 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3470 globs = globals()
3471 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3472 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3473 # Avoid turning open into a bound method.
3474 py_io_ns["open"] = pyio.OpenWrapper
3475 for test in tests:
3476 if test.__name__.startswith("C"):
3477 for name, obj in c_io_ns.items():
3478 setattr(test, name, obj)
3479 elif test.__name__.startswith("Py"):
3480 for name, obj in py_io_ns.items():
3481 setattr(test, name, obj)
3482
Ezio Melottidaa42c72013-03-23 16:30:16 +02003483 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3484 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003485
3486if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003487 unittest.main()