blob: 1cf97dd2e6ef2fa6629ba57b256382d0c98c113a [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Antoine Pitrou712cb732013-12-21 15:51:54 +010038from test.script_helper import assert_python_ok
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200168 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200599 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200601 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300656 def test_nonbuffered_textio(self):
657 with warnings.catch_warnings(record=True) as recorded:
658 with self.assertRaises(ValueError):
659 self.open(support.TESTFN, 'w', buffering=0)
660 support.gc_collect()
661 self.assertEqual(recorded, [])
662
663 def test_invalid_newline(self):
664 with warnings.catch_warnings(record=True) as recorded:
665 with self.assertRaises(ValueError):
666 self.open(support.TESTFN, 'w', newline='invalid')
667 support.gc_collect()
668 self.assertEqual(recorded, [])
669
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200672
673 def test_IOBase_finalize(self):
674 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
675 # class which inherits IOBase and an object of this class are caught
676 # in a reference cycle and close() is already in the method cache.
677 class MyIO(self.IOBase):
678 def close(self):
679 pass
680
681 # create an instance to populate the method cache
682 MyIO()
683 obj = MyIO()
684 obj.obj = obj
685 wr = weakref.ref(obj)
686 del MyIO
687 del obj
688 support.gc_collect()
689 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000691class PyIOTest(IOTest):
692 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000693
Guido van Rossuma9e20242007-03-08 00:43:48 +0000694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695class CommonBufferedTests:
696 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
697
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000698 def test_detach(self):
699 raw = self.MockRawIO()
700 buf = self.tp(raw)
701 self.assertIs(buf.detach(), raw)
702 self.assertRaises(ValueError, buf.detach)
703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704 def test_fileno(self):
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707
Ezio Melottib3aedd42010-11-20 19:04:17 +0000708 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709
Zachary Ware9fe6d862013-12-08 00:20:35 -0600710 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_no_fileno(self):
712 # XXX will we always have fileno() function? If so, kill
713 # this test. Else, write it.
714 pass
715
716 def test_invalid_args(self):
717 rawio = self.MockRawIO()
718 bufio = self.tp(rawio)
719 # Invalid whence
720 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200721 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000722
723 def test_override_destructor(self):
724 tp = self.tp
725 record = []
726 class MyBufferedIO(tp):
727 def __del__(self):
728 record.append(1)
729 try:
730 f = super().__del__
731 except AttributeError:
732 pass
733 else:
734 f()
735 def close(self):
736 record.append(2)
737 super().close()
738 def flush(self):
739 record.append(3)
740 super().flush()
741 rawio = self.MockRawIO()
742 bufio = MyBufferedIO(rawio)
743 writable = bufio.writable()
744 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000745 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 if writable:
747 self.assertEqual(record, [1, 2, 3])
748 else:
749 self.assertEqual(record, [1, 2])
750
751 def test_context_manager(self):
752 # Test usability as a context manager
753 rawio = self.MockRawIO()
754 bufio = self.tp(rawio)
755 def _with():
756 with bufio:
757 pass
758 _with()
759 # bufio should now be closed, and using it a second time should raise
760 # a ValueError.
761 self.assertRaises(ValueError, _with)
762
763 def test_error_through_destructor(self):
764 # Test that the exception state is not modified by a destructor,
765 # even if close() fails.
766 rawio = self.CloseFailureIO()
767 def f():
768 self.tp(rawio).xyzzy
769 with support.captured_output("stderr") as s:
770 self.assertRaises(AttributeError, f)
771 s = s.getvalue().strip()
772 if s:
773 # The destructor *may* have printed an unraisable error, check it
774 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200775 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000776 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000777
Antoine Pitrou716c4442009-05-23 19:04:03 +0000778 def test_repr(self):
779 raw = self.MockRawIO()
780 b = self.tp(raw)
781 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
782 self.assertEqual(repr(b), "<%s>" % clsname)
783 raw.name = "dummy"
784 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
785 raw.name = b"dummy"
786 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
787
Antoine Pitrou6be88762010-05-03 16:48:20 +0000788 def test_flush_error_on_close(self):
789 raw = self.MockRawIO()
790 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200791 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000792 raw.flush = bad_flush
793 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200794 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600795 self.assertTrue(b.closed)
796
797 def test_close_error_on_close(self):
798 raw = self.MockRawIO()
799 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200800 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -0600801 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -0600803 raw.close = bad_close
804 b = self.tp(raw)
805 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200806 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600807 b.close()
808 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300809 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -0600810 self.assertEqual(err.exception.__context__.args, ('flush',))
811 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000812
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +0300813 def test_nonnormalized_close_error_on_close(self):
814 # Issue #21677
815 raw = self.MockRawIO()
816 def bad_flush():
817 raise non_existing_flush
818 def bad_close():
819 raise non_existing_close
820 raw.close = bad_close
821 b = self.tp(raw)
822 b.flush = bad_flush
823 with self.assertRaises(NameError) as err: # exception not swallowed
824 b.close()
825 self.assertIn('non_existing_close', str(err.exception))
826 self.assertIsInstance(err.exception.__context__, NameError)
827 self.assertIn('non_existing_flush', str(err.exception.__context__))
828 self.assertFalse(b.closed)
829
Antoine Pitrou6be88762010-05-03 16:48:20 +0000830 def test_multi_close(self):
831 raw = self.MockRawIO()
832 b = self.tp(raw)
833 b.close()
834 b.close()
835 b.close()
836 self.assertRaises(ValueError, b.flush)
837
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000838 def test_unseekable(self):
839 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
840 self.assertRaises(self.UnsupportedOperation, bufio.tell)
841 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
842
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000843 def test_readonly_attributes(self):
844 raw = self.MockRawIO()
845 buf = self.tp(raw)
846 x = self.MockRawIO()
847 with self.assertRaises(AttributeError):
848 buf.raw = x
849
Guido van Rossum78892e42007-04-06 17:31:18 +0000850
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200851class SizeofTest:
852
853 @support.cpython_only
854 def test_sizeof(self):
855 bufsize1 = 4096
856 bufsize2 = 8192
857 rawio = self.MockRawIO()
858 bufio = self.tp(rawio, buffer_size=bufsize1)
859 size = sys.getsizeof(bufio) - bufsize1
860 rawio = self.MockRawIO()
861 bufio = self.tp(rawio, buffer_size=bufsize2)
862 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
863
Jesus Ceadc469452012-10-04 12:37:56 +0200864 @support.cpython_only
865 def test_buffer_freeing(self) :
866 bufsize = 4096
867 rawio = self.MockRawIO()
868 bufio = self.tp(rawio, buffer_size=bufsize)
869 size = sys.getsizeof(bufio) - bufsize
870 bufio.close()
871 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
874 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_constructor(self):
877 rawio = self.MockRawIO([b"abc"])
878 bufio = self.tp(rawio)
879 bufio.__init__(rawio)
880 bufio.__init__(rawio, buffer_size=1024)
881 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000882 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
884 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
885 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
886 rawio = self.MockRawIO([b"abc"])
887 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000889
Serhiy Storchaka61e24932014-02-12 10:52:35 +0200890 def test_uninitialized(self):
891 bufio = self.tp.__new__(self.tp)
892 del bufio
893 bufio = self.tp.__new__(self.tp)
894 self.assertRaisesRegex((ValueError, AttributeError),
895 'uninitialized|has no attribute',
896 bufio.read, 0)
897 bufio.__init__(self.MockRawIO())
898 self.assertEqual(bufio.read(0), b'')
899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000900 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000901 for arg in (None, 7):
902 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
903 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000905 # Invalid args
906 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_read1(self):
909 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
910 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000911 self.assertEqual(b"a", bufio.read(1))
912 self.assertEqual(b"b", bufio.read1(1))
913 self.assertEqual(rawio._reads, 1)
914 self.assertEqual(b"c", bufio.read1(100))
915 self.assertEqual(rawio._reads, 1)
916 self.assertEqual(b"d", bufio.read1(100))
917 self.assertEqual(rawio._reads, 2)
918 self.assertEqual(b"efg", bufio.read1(100))
919 self.assertEqual(rawio._reads, 3)
920 self.assertEqual(b"", bufio.read1(100))
921 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 # Invalid args
923 self.assertRaises(ValueError, bufio.read1, -1)
924
925 def test_readinto(self):
926 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
927 bufio = self.tp(rawio)
928 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000929 self.assertEqual(bufio.readinto(b), 2)
930 self.assertEqual(b, b"ab")
931 self.assertEqual(bufio.readinto(b), 2)
932 self.assertEqual(b, b"cd")
933 self.assertEqual(bufio.readinto(b), 2)
934 self.assertEqual(b, b"ef")
935 self.assertEqual(bufio.readinto(b), 1)
936 self.assertEqual(b, b"gf")
937 self.assertEqual(bufio.readinto(b), 0)
938 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200939 rawio = self.MockRawIO((b"abc", None))
940 bufio = self.tp(rawio)
941 self.assertEqual(bufio.readinto(b), 2)
942 self.assertEqual(b, b"ab")
943 self.assertEqual(bufio.readinto(b), 1)
944 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000946 def test_readlines(self):
947 def bufio():
948 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
949 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000950 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
951 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
952 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000954 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000955 data = b"abcdefghi"
956 dlen = len(data)
957
958 tests = [
959 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
960 [ 100, [ 3, 3, 3], [ dlen ] ],
961 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
962 ]
963
964 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 rawio = self.MockFileIO(data)
966 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000967 pos = 0
968 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000969 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000970 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000971 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000972 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000974 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000975 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
977 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000978 self.assertEqual(b"abcd", bufio.read(6))
979 self.assertEqual(b"e", bufio.read(1))
980 self.assertEqual(b"fg", bufio.read())
981 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200982 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000983 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000984
Victor Stinnera80987f2011-05-25 22:47:16 +0200985 rawio = self.MockRawIO((b"a", None, None))
986 self.assertEqual(b"a", rawio.readall())
987 self.assertIsNone(rawio.readall())
988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_read_past_eof(self):
990 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000992
Ezio Melottib3aedd42010-11-20 19:04:17 +0000993 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 def test_read_all(self):
996 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
997 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000998
Ezio Melottib3aedd42010-11-20 19:04:17 +0000999 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001000
Victor Stinner45df8202010-04-28 22:31:17 +00001001 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001002 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001004 try:
1005 # Write out many bytes with exactly the same number of 0's,
1006 # 1's... 255's. This will help us check that concurrent reading
1007 # doesn't duplicate or forget contents.
1008 N = 1000
1009 l = list(range(256)) * N
1010 random.shuffle(l)
1011 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001012 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001013 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001014 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001016 errors = []
1017 results = []
1018 def f():
1019 try:
1020 # Intra-buffer read then buffer-flushing read
1021 for n in cycle([1, 19]):
1022 s = bufio.read(n)
1023 if not s:
1024 break
1025 # list.append() is atomic
1026 results.append(s)
1027 except Exception as e:
1028 errors.append(e)
1029 raise
1030 threads = [threading.Thread(target=f) for x in range(20)]
1031 for t in threads:
1032 t.start()
1033 time.sleep(0.02) # yield
1034 for t in threads:
1035 t.join()
1036 self.assertFalse(errors,
1037 "the following exceptions were caught: %r" % errors)
1038 s = b''.join(results)
1039 for i in range(256):
1040 c = bytes(bytearray([i]))
1041 self.assertEqual(s.count(c), N)
1042 finally:
1043 support.unlink(support.TESTFN)
1044
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001045 def test_unseekable(self):
1046 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1047 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1048 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1049 bufio.read(1)
1050 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1051 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053 def test_misbehaved_io(self):
1054 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1055 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001056 self.assertRaises(OSError, bufio.seek, 0)
1057 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001059 def test_no_extraneous_read(self):
1060 # Issue #9550; when the raw IO object has satisfied the read request,
1061 # we should not issue any additional reads, otherwise it may block
1062 # (e.g. socket).
1063 bufsize = 16
1064 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1065 rawio = self.MockRawIO([b"x" * n])
1066 bufio = self.tp(rawio, bufsize)
1067 self.assertEqual(bufio.read(n), b"x" * n)
1068 # Simple case: one raw read is enough to satisfy the request.
1069 self.assertEqual(rawio._extraneous_reads, 0,
1070 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1071 # A more complex case where two raw reads are needed to satisfy
1072 # the request.
1073 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1074 bufio = self.tp(rawio, bufsize)
1075 self.assertEqual(bufio.read(n), b"x" * n)
1076 self.assertEqual(rawio._extraneous_reads, 0,
1077 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1078
1079
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001080class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 tp = io.BufferedReader
1082
1083 def test_constructor(self):
1084 BufferedReaderTest.test_constructor(self)
1085 # The allocation can succeed on 32-bit builds, e.g. with more
1086 # than 2GB RAM and a 64-bit kernel.
1087 if sys.maxsize > 0x7FFFFFFF:
1088 rawio = self.MockRawIO()
1089 bufio = self.tp(rawio)
1090 self.assertRaises((OverflowError, MemoryError, ValueError),
1091 bufio.__init__, rawio, sys.maxsize)
1092
1093 def test_initialization(self):
1094 rawio = self.MockRawIO([b"abc"])
1095 bufio = self.tp(rawio)
1096 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1097 self.assertRaises(ValueError, bufio.read)
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1099 self.assertRaises(ValueError, bufio.read)
1100 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1101 self.assertRaises(ValueError, bufio.read)
1102
1103 def test_misbehaved_io_read(self):
1104 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1105 bufio = self.tp(rawio)
1106 # _pyio.BufferedReader seems to implement reading different, so that
1107 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001108 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109
1110 def test_garbage_collection(self):
1111 # C BufferedReader objects are collected.
1112 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001113 with support.check_warnings(('', ResourceWarning)):
1114 rawio = self.FileIO(support.TESTFN, "w+b")
1115 f = self.tp(rawio)
1116 f.f = f
1117 wr = weakref.ref(f)
1118 del f
1119 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001120 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121
R David Murray67bfe802013-02-23 21:51:05 -05001122 def test_args_error(self):
1123 # Issue #17275
1124 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1125 self.tp(io.BytesIO(), 1024, 1024, 1024)
1126
1127
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128class PyBufferedReaderTest(BufferedReaderTest):
1129 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001130
Guido van Rossuma9e20242007-03-08 00:43:48 +00001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1133 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def test_constructor(self):
1136 rawio = self.MockRawIO()
1137 bufio = self.tp(rawio)
1138 bufio.__init__(rawio)
1139 bufio.__init__(rawio, buffer_size=1024)
1140 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001142 bufio.flush()
1143 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1144 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1145 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1146 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001147 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001151 def test_uninitialized(self):
1152 bufio = self.tp.__new__(self.tp)
1153 del bufio
1154 bufio = self.tp.__new__(self.tp)
1155 self.assertRaisesRegex((ValueError, AttributeError),
1156 'uninitialized|has no attribute',
1157 bufio.write, b'')
1158 bufio.__init__(self.MockRawIO())
1159 self.assertEqual(bufio.write(b''), 0)
1160
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001161 def test_detach_flush(self):
1162 raw = self.MockRawIO()
1163 buf = self.tp(raw)
1164 buf.write(b"howdy!")
1165 self.assertFalse(raw._write_stack)
1166 buf.detach()
1167 self.assertEqual(raw._write_stack, [b"howdy!"])
1168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001170 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 writer = self.MockRawIO()
1172 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001173 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001174 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 def test_write_overflow(self):
1177 writer = self.MockRawIO()
1178 bufio = self.tp(writer, 8)
1179 contents = b"abcdefghijklmnop"
1180 for n in range(0, len(contents), 3):
1181 bufio.write(contents[n:n+3])
1182 flushed = b"".join(writer._write_stack)
1183 # At least (total - 8) bytes were implicitly flushed, perhaps more
1184 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001185 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 def check_writes(self, intermediate_func):
1188 # Lots of writes, test the flushed output is as expected.
1189 contents = bytes(range(256)) * 1000
1190 n = 0
1191 writer = self.MockRawIO()
1192 bufio = self.tp(writer, 13)
1193 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1194 def gen_sizes():
1195 for size in count(1):
1196 for i in range(15):
1197 yield size
1198 sizes = gen_sizes()
1199 while n < len(contents):
1200 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001201 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 intermediate_func(bufio)
1203 n += size
1204 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001205 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 def test_writes(self):
1208 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001210 def test_writes_and_flushes(self):
1211 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213 def test_writes_and_seeks(self):
1214 def _seekabs(bufio):
1215 pos = bufio.tell()
1216 bufio.seek(pos + 1, 0)
1217 bufio.seek(pos - 1, 0)
1218 bufio.seek(pos, 0)
1219 self.check_writes(_seekabs)
1220 def _seekrel(bufio):
1221 pos = bufio.seek(0, 1)
1222 bufio.seek(+1, 1)
1223 bufio.seek(-1, 1)
1224 bufio.seek(pos, 0)
1225 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_writes_and_truncates(self):
1228 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001230 def test_write_non_blocking(self):
1231 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001232 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001233
Ezio Melottib3aedd42010-11-20 19:04:17 +00001234 self.assertEqual(bufio.write(b"abcd"), 4)
1235 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236 # 1 byte will be written, the rest will be buffered
1237 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001238 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001240 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1241 raw.block_on(b"0")
1242 try:
1243 bufio.write(b"opqrwxyz0123456789")
1244 except self.BlockingIOError as e:
1245 written = e.characters_written
1246 else:
1247 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001248 self.assertEqual(written, 16)
1249 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001251
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 s = raw.pop_written()
1254 # Previously buffered bytes were flushed
1255 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_write_and_rewind(self):
1258 raw = io.BytesIO()
1259 bufio = self.tp(raw, 4)
1260 self.assertEqual(bufio.write(b"abcdef"), 6)
1261 self.assertEqual(bufio.tell(), 6)
1262 bufio.seek(0, 0)
1263 self.assertEqual(bufio.write(b"XY"), 2)
1264 bufio.seek(6, 0)
1265 self.assertEqual(raw.getvalue(), b"XYcdef")
1266 self.assertEqual(bufio.write(b"123456"), 6)
1267 bufio.flush()
1268 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 def test_flush(self):
1271 writer = self.MockRawIO()
1272 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001273 bufio.write(b"abc")
1274 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001275 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001276
Antoine Pitrou131a4892012-10-16 22:57:11 +02001277 def test_writelines(self):
1278 l = [b'ab', b'cd', b'ef']
1279 writer = self.MockRawIO()
1280 bufio = self.tp(writer, 8)
1281 bufio.writelines(l)
1282 bufio.flush()
1283 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1284
1285 def test_writelines_userlist(self):
1286 l = UserList([b'ab', b'cd', b'ef'])
1287 writer = self.MockRawIO()
1288 bufio = self.tp(writer, 8)
1289 bufio.writelines(l)
1290 bufio.flush()
1291 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1292
1293 def test_writelines_error(self):
1294 writer = self.MockRawIO()
1295 bufio = self.tp(writer, 8)
1296 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1297 self.assertRaises(TypeError, bufio.writelines, None)
1298 self.assertRaises(TypeError, bufio.writelines, 'abc')
1299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 def test_destructor(self):
1301 writer = self.MockRawIO()
1302 bufio = self.tp(writer, 8)
1303 bufio.write(b"abc")
1304 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001305 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001306 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001307
1308 def test_truncate(self):
1309 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001310 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311 bufio = self.tp(raw, 8)
1312 bufio.write(b"abcdef")
1313 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001314 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001315 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001316 self.assertEqual(f.read(), b"abc")
1317
Victor Stinner45df8202010-04-28 22:31:17 +00001318 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001319 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001321 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001322 # Write out many bytes from many threads and test they were
1323 # all flushed.
1324 N = 1000
1325 contents = bytes(range(256)) * N
1326 sizes = cycle([1, 19])
1327 n = 0
1328 queue = deque()
1329 while n < len(contents):
1330 size = next(sizes)
1331 queue.append(contents[n:n+size])
1332 n += size
1333 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001334 # We use a real file object because it allows us to
1335 # exercise situations where the GIL is released before
1336 # writing the buffer to the raw streams. This is in addition
1337 # to concurrency issues due to switching threads in the middle
1338 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001339 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001341 errors = []
1342 def f():
1343 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001344 while True:
1345 try:
1346 s = queue.popleft()
1347 except IndexError:
1348 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001349 bufio.write(s)
1350 except Exception as e:
1351 errors.append(e)
1352 raise
1353 threads = [threading.Thread(target=f) for x in range(20)]
1354 for t in threads:
1355 t.start()
1356 time.sleep(0.02) # yield
1357 for t in threads:
1358 t.join()
1359 self.assertFalse(errors,
1360 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001361 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001362 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 s = f.read()
1364 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001365 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001366 finally:
1367 support.unlink(support.TESTFN)
1368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 def test_misbehaved_io(self):
1370 rawio = self.MisbehavedRawIO()
1371 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001372 self.assertRaises(OSError, bufio.seek, 0)
1373 self.assertRaises(OSError, bufio.tell)
1374 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001375
Florent Xicluna109d5732012-07-07 17:03:22 +02001376 def test_max_buffer_size_removal(self):
1377 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001378 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001379
Benjamin Peterson68623612012-12-20 11:53:11 -06001380 def test_write_error_on_close(self):
1381 raw = self.MockRawIO()
1382 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001383 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001384 raw.write = bad_write
1385 b = self.tp(raw)
1386 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001387 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001388 self.assertTrue(b.closed)
1389
Benjamin Peterson59406a92009-03-26 17:10:29 +00001390
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001391class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 tp = io.BufferedWriter
1393
1394 def test_constructor(self):
1395 BufferedWriterTest.test_constructor(self)
1396 # The allocation can succeed on 32-bit builds, e.g. with more
1397 # than 2GB RAM and a 64-bit kernel.
1398 if sys.maxsize > 0x7FFFFFFF:
1399 rawio = self.MockRawIO()
1400 bufio = self.tp(rawio)
1401 self.assertRaises((OverflowError, MemoryError, ValueError),
1402 bufio.__init__, rawio, sys.maxsize)
1403
1404 def test_initialization(self):
1405 rawio = self.MockRawIO()
1406 bufio = self.tp(rawio)
1407 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1408 self.assertRaises(ValueError, bufio.write, b"def")
1409 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1410 self.assertRaises(ValueError, bufio.write, b"def")
1411 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1412 self.assertRaises(ValueError, bufio.write, b"def")
1413
1414 def test_garbage_collection(self):
1415 # C BufferedWriter objects are collected, and collecting them flushes
1416 # all data to disk.
1417 # The Python version has __del__, so it ends into gc.garbage instead
Antoine Pitrou796564c2013-07-30 19:59:21 +02001418 with support.check_warnings(('', ResourceWarning)):
1419 rawio = self.FileIO(support.TESTFN, "w+b")
1420 f = self.tp(rawio)
1421 f.write(b"123xxx")
1422 f.x = f
1423 wr = weakref.ref(f)
1424 del f
1425 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001426 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001427 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 self.assertEqual(f.read(), b"123xxx")
1429
R David Murray67bfe802013-02-23 21:51:05 -05001430 def test_args_error(self):
1431 # Issue #17275
1432 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1433 self.tp(io.BytesIO(), 1024, 1024, 1024)
1434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435
1436class PyBufferedWriterTest(BufferedWriterTest):
1437 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001438
Guido van Rossum01a27522007-03-07 01:00:12 +00001439class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001440
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001441 def test_constructor(self):
1442 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001443 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001444
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001445 def test_uninitialized(self):
1446 pair = self.tp.__new__(self.tp)
1447 del pair
1448 pair = self.tp.__new__(self.tp)
1449 self.assertRaisesRegex((ValueError, AttributeError),
1450 'uninitialized|has no attribute',
1451 pair.read, 0)
1452 self.assertRaisesRegex((ValueError, AttributeError),
1453 'uninitialized|has no attribute',
1454 pair.write, b'')
1455 pair.__init__(self.MockRawIO(), self.MockRawIO())
1456 self.assertEqual(pair.read(0), b'')
1457 self.assertEqual(pair.write(b''), 0)
1458
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001459 def test_detach(self):
1460 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1461 self.assertRaises(self.UnsupportedOperation, pair.detach)
1462
Florent Xicluna109d5732012-07-07 17:03:22 +02001463 def test_constructor_max_buffer_size_removal(self):
1464 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001465 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001466
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001467 def test_constructor_with_not_readable(self):
1468 class NotReadable(MockRawIO):
1469 def readable(self):
1470 return False
1471
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001472 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001473
1474 def test_constructor_with_not_writeable(self):
1475 class NotWriteable(MockRawIO):
1476 def writable(self):
1477 return False
1478
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001479 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001480
1481 def test_read(self):
1482 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1483
1484 self.assertEqual(pair.read(3), b"abc")
1485 self.assertEqual(pair.read(1), b"d")
1486 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001487 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1488 self.assertEqual(pair.read(None), b"abc")
1489
1490 def test_readlines(self):
1491 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1492 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1493 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1494 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001495
1496 def test_read1(self):
1497 # .read1() is delegated to the underlying reader object, so this test
1498 # can be shallow.
1499 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1500
1501 self.assertEqual(pair.read1(3), b"abc")
1502
1503 def test_readinto(self):
1504 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1505
1506 data = bytearray(5)
1507 self.assertEqual(pair.readinto(data), 5)
1508 self.assertEqual(data, b"abcde")
1509
1510 def test_write(self):
1511 w = self.MockRawIO()
1512 pair = self.tp(self.MockRawIO(), w)
1513
1514 pair.write(b"abc")
1515 pair.flush()
1516 pair.write(b"def")
1517 pair.flush()
1518 self.assertEqual(w._write_stack, [b"abc", b"def"])
1519
1520 def test_peek(self):
1521 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1522
1523 self.assertTrue(pair.peek(3).startswith(b"abc"))
1524 self.assertEqual(pair.read(3), b"abc")
1525
1526 def test_readable(self):
1527 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1528 self.assertTrue(pair.readable())
1529
1530 def test_writeable(self):
1531 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1532 self.assertTrue(pair.writable())
1533
1534 def test_seekable(self):
1535 # BufferedRWPairs are never seekable, even if their readers and writers
1536 # are.
1537 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1538 self.assertFalse(pair.seekable())
1539
1540 # .flush() is delegated to the underlying writer object and has been
1541 # tested in the test_write method.
1542
1543 def test_close_and_closed(self):
1544 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1545 self.assertFalse(pair.closed)
1546 pair.close()
1547 self.assertTrue(pair.closed)
1548
1549 def test_isatty(self):
1550 class SelectableIsAtty(MockRawIO):
1551 def __init__(self, isatty):
1552 MockRawIO.__init__(self)
1553 self._isatty = isatty
1554
1555 def isatty(self):
1556 return self._isatty
1557
1558 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1559 self.assertFalse(pair.isatty())
1560
1561 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1562 self.assertTrue(pair.isatty())
1563
1564 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1565 self.assertTrue(pair.isatty())
1566
1567 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1568 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001570class CBufferedRWPairTest(BufferedRWPairTest):
1571 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001572
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573class PyBufferedRWPairTest(BufferedRWPairTest):
1574 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576
1577class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1578 read_mode = "rb+"
1579 write_mode = "wb+"
1580
1581 def test_constructor(self):
1582 BufferedReaderTest.test_constructor(self)
1583 BufferedWriterTest.test_constructor(self)
1584
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001585 def test_uninitialized(self):
1586 BufferedReaderTest.test_uninitialized(self)
1587 BufferedWriterTest.test_uninitialized(self)
1588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 def test_read_and_write(self):
1590 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001591 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001592
1593 self.assertEqual(b"as", rw.read(2))
1594 rw.write(b"ddd")
1595 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001596 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001598 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001599
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600 def test_seek_and_tell(self):
1601 raw = self.BytesIO(b"asdfghjkl")
1602 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001603
Ezio Melottib3aedd42010-11-20 19:04:17 +00001604 self.assertEqual(b"as", rw.read(2))
1605 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001606 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001607 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001608
Antoine Pitroue05565e2011-08-20 14:39:23 +02001609 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001610 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001611 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001612 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001613 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001614 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001615 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001616 self.assertEqual(7, rw.tell())
1617 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001618 rw.flush()
1619 self.assertEqual(b"asdf123fl", raw.getvalue())
1620
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001621 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001622
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001623 def check_flush_and_read(self, read_func):
1624 raw = self.BytesIO(b"abcdefghi")
1625 bufio = self.tp(raw)
1626
Ezio Melottib3aedd42010-11-20 19:04:17 +00001627 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001628 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001629 self.assertEqual(b"ef", read_func(bufio, 2))
1630 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001632 self.assertEqual(6, bufio.tell())
1633 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 raw.seek(0, 0)
1635 raw.write(b"XYZ")
1636 # flush() resets the read buffer
1637 bufio.flush()
1638 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001639 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640
1641 def test_flush_and_read(self):
1642 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1643
1644 def test_flush_and_readinto(self):
1645 def _readinto(bufio, n=-1):
1646 b = bytearray(n if n >= 0 else 9999)
1647 n = bufio.readinto(b)
1648 return bytes(b[:n])
1649 self.check_flush_and_read(_readinto)
1650
1651 def test_flush_and_peek(self):
1652 def _peek(bufio, n=-1):
1653 # This relies on the fact that the buffer can contain the whole
1654 # raw stream, otherwise peek() can return less.
1655 b = bufio.peek(n)
1656 if n != -1:
1657 b = b[:n]
1658 bufio.seek(len(b), 1)
1659 return b
1660 self.check_flush_and_read(_peek)
1661
1662 def test_flush_and_write(self):
1663 raw = self.BytesIO(b"abcdefghi")
1664 bufio = self.tp(raw)
1665
1666 bufio.write(b"123")
1667 bufio.flush()
1668 bufio.write(b"45")
1669 bufio.flush()
1670 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001671 self.assertEqual(b"12345fghi", raw.getvalue())
1672 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673
1674 def test_threads(self):
1675 BufferedReaderTest.test_threads(self)
1676 BufferedWriterTest.test_threads(self)
1677
1678 def test_writes_and_peek(self):
1679 def _peek(bufio):
1680 bufio.peek(1)
1681 self.check_writes(_peek)
1682 def _peek(bufio):
1683 pos = bufio.tell()
1684 bufio.seek(-1, 1)
1685 bufio.peek(1)
1686 bufio.seek(pos, 0)
1687 self.check_writes(_peek)
1688
1689 def test_writes_and_reads(self):
1690 def _read(bufio):
1691 bufio.seek(-1, 1)
1692 bufio.read(1)
1693 self.check_writes(_read)
1694
1695 def test_writes_and_read1s(self):
1696 def _read1(bufio):
1697 bufio.seek(-1, 1)
1698 bufio.read1(1)
1699 self.check_writes(_read1)
1700
1701 def test_writes_and_readintos(self):
1702 def _read(bufio):
1703 bufio.seek(-1, 1)
1704 bufio.readinto(bytearray(1))
1705 self.check_writes(_read)
1706
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001707 def test_write_after_readahead(self):
1708 # Issue #6629: writing after the buffer was filled by readahead should
1709 # first rewind the raw stream.
1710 for overwrite_size in [1, 5]:
1711 raw = self.BytesIO(b"A" * 10)
1712 bufio = self.tp(raw, 4)
1713 # Trigger readahead
1714 self.assertEqual(bufio.read(1), b"A")
1715 self.assertEqual(bufio.tell(), 1)
1716 # Overwriting should rewind the raw stream if it needs so
1717 bufio.write(b"B" * overwrite_size)
1718 self.assertEqual(bufio.tell(), overwrite_size + 1)
1719 # If the write size was smaller than the buffer size, flush() and
1720 # check that rewind happens.
1721 bufio.flush()
1722 self.assertEqual(bufio.tell(), overwrite_size + 1)
1723 s = raw.getvalue()
1724 self.assertEqual(s,
1725 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1726
Antoine Pitrou7c404892011-05-13 00:13:33 +02001727 def test_write_rewind_write(self):
1728 # Various combinations of reading / writing / seeking backwards / writing again
1729 def mutate(bufio, pos1, pos2):
1730 assert pos2 >= pos1
1731 # Fill the buffer
1732 bufio.seek(pos1)
1733 bufio.read(pos2 - pos1)
1734 bufio.write(b'\x02')
1735 # This writes earlier than the previous write, but still inside
1736 # the buffer.
1737 bufio.seek(pos1)
1738 bufio.write(b'\x01')
1739
1740 b = b"\x80\x81\x82\x83\x84"
1741 for i in range(0, len(b)):
1742 for j in range(i, len(b)):
1743 raw = self.BytesIO(b)
1744 bufio = self.tp(raw, 100)
1745 mutate(bufio, i, j)
1746 bufio.flush()
1747 expected = bytearray(b)
1748 expected[j] = 2
1749 expected[i] = 1
1750 self.assertEqual(raw.getvalue(), expected,
1751 "failed result for i=%d, j=%d" % (i, j))
1752
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001753 def test_truncate_after_read_or_write(self):
1754 raw = self.BytesIO(b"A" * 10)
1755 bufio = self.tp(raw, 100)
1756 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1757 self.assertEqual(bufio.truncate(), 2)
1758 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1759 self.assertEqual(bufio.truncate(), 4)
1760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001761 def test_misbehaved_io(self):
1762 BufferedReaderTest.test_misbehaved_io(self)
1763 BufferedWriterTest.test_misbehaved_io(self)
1764
Antoine Pitroue05565e2011-08-20 14:39:23 +02001765 def test_interleaved_read_write(self):
1766 # Test for issue #12213
1767 with self.BytesIO(b'abcdefgh') as raw:
1768 with self.tp(raw, 100) as f:
1769 f.write(b"1")
1770 self.assertEqual(f.read(1), b'b')
1771 f.write(b'2')
1772 self.assertEqual(f.read1(1), b'd')
1773 f.write(b'3')
1774 buf = bytearray(1)
1775 f.readinto(buf)
1776 self.assertEqual(buf, b'f')
1777 f.write(b'4')
1778 self.assertEqual(f.peek(1), b'h')
1779 f.flush()
1780 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1781
1782 with self.BytesIO(b'abc') as raw:
1783 with self.tp(raw, 100) as f:
1784 self.assertEqual(f.read(1), b'a')
1785 f.write(b"2")
1786 self.assertEqual(f.read(1), b'c')
1787 f.flush()
1788 self.assertEqual(raw.getvalue(), b'a2c')
1789
1790 def test_interleaved_readline_write(self):
1791 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1792 with self.tp(raw) as f:
1793 f.write(b'1')
1794 self.assertEqual(f.readline(), b'b\n')
1795 f.write(b'2')
1796 self.assertEqual(f.readline(), b'def\n')
1797 f.write(b'3')
1798 self.assertEqual(f.readline(), b'\n')
1799 f.flush()
1800 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1801
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001802 # You can't construct a BufferedRandom over a non-seekable stream.
1803 test_unseekable = None
1804
R David Murray67bfe802013-02-23 21:51:05 -05001805
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001806class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 tp = io.BufferedRandom
1808
1809 def test_constructor(self):
1810 BufferedRandomTest.test_constructor(self)
1811 # The allocation can succeed on 32-bit builds, e.g. with more
1812 # than 2GB RAM and a 64-bit kernel.
1813 if sys.maxsize > 0x7FFFFFFF:
1814 rawio = self.MockRawIO()
1815 bufio = self.tp(rawio)
1816 self.assertRaises((OverflowError, MemoryError, ValueError),
1817 bufio.__init__, rawio, sys.maxsize)
1818
1819 def test_garbage_collection(self):
1820 CBufferedReaderTest.test_garbage_collection(self)
1821 CBufferedWriterTest.test_garbage_collection(self)
1822
R David Murray67bfe802013-02-23 21:51:05 -05001823 def test_args_error(self):
1824 # Issue #17275
1825 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1826 self.tp(io.BytesIO(), 1024, 1024, 1024)
1827
1828
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829class PyBufferedRandomTest(BufferedRandomTest):
1830 tp = pyio.BufferedRandom
1831
1832
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001833# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1834# properties:
1835# - A single output character can correspond to many bytes of input.
1836# - The number of input bytes to complete the character can be
1837# undetermined until the last input byte is received.
1838# - The number of input bytes can vary depending on previous input.
1839# - A single input byte can correspond to many characters of output.
1840# - The number of output characters can be undetermined until the
1841# last input byte is received.
1842# - The number of output characters can vary depending on previous input.
1843
1844class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1845 """
1846 For testing seek/tell behavior with a stateful, buffering decoder.
1847
1848 Input is a sequence of words. Words may be fixed-length (length set
1849 by input) or variable-length (period-terminated). In variable-length
1850 mode, extra periods are ignored. Possible words are:
1851 - 'i' followed by a number sets the input length, I (maximum 99).
1852 When I is set to 0, words are space-terminated.
1853 - 'o' followed by a number sets the output length, O (maximum 99).
1854 - Any other word is converted into a word followed by a period on
1855 the output. The output word consists of the input word truncated
1856 or padded out with hyphens to make its length equal to O. If O
1857 is 0, the word is output verbatim without truncating or padding.
1858 I and O are initially set to 1. When I changes, any buffered input is
1859 re-scanned according to the new I. EOF also terminates the last word.
1860 """
1861
1862 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001863 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001864 self.reset()
1865
1866 def __repr__(self):
1867 return '<SID %x>' % id(self)
1868
1869 def reset(self):
1870 self.i = 1
1871 self.o = 1
1872 self.buffer = bytearray()
1873
1874 def getstate(self):
1875 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1876 return bytes(self.buffer), i*100 + o
1877
1878 def setstate(self, state):
1879 buffer, io = state
1880 self.buffer = bytearray(buffer)
1881 i, o = divmod(io, 100)
1882 self.i, self.o = i ^ 1, o ^ 1
1883
1884 def decode(self, input, final=False):
1885 output = ''
1886 for b in input:
1887 if self.i == 0: # variable-length, terminated with period
1888 if b == ord('.'):
1889 if self.buffer:
1890 output += self.process_word()
1891 else:
1892 self.buffer.append(b)
1893 else: # fixed-length, terminate after self.i bytes
1894 self.buffer.append(b)
1895 if len(self.buffer) == self.i:
1896 output += self.process_word()
1897 if final and self.buffer: # EOF terminates the last word
1898 output += self.process_word()
1899 return output
1900
1901 def process_word(self):
1902 output = ''
1903 if self.buffer[0] == ord('i'):
1904 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1905 elif self.buffer[0] == ord('o'):
1906 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1907 else:
1908 output = self.buffer.decode('ascii')
1909 if len(output) < self.o:
1910 output += '-'*self.o # pad out with hyphens
1911 if self.o:
1912 output = output[:self.o] # truncate to output length
1913 output += '.'
1914 self.buffer = bytearray()
1915 return output
1916
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001917 codecEnabled = False
1918
1919 @classmethod
1920 def lookupTestDecoder(cls, name):
1921 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001922 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001923 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001924 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001925 incrementalencoder=None,
1926 streamreader=None, streamwriter=None,
1927 incrementaldecoder=cls)
1928
1929# Register the previous decoder for testing.
1930# Disabled by default, tests will enable it.
1931codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1932
1933
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001934class StatefulIncrementalDecoderTest(unittest.TestCase):
1935 """
1936 Make sure the StatefulIncrementalDecoder actually works.
1937 """
1938
1939 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001940 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001941 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001942 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001943 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001944 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001945 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001946 # I=0, O=6 (variable-length input, fixed-length output)
1947 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1948 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001949 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001950 # I=6, O=3 (fixed-length input > fixed-length output)
1951 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1952 # I=0, then 3; O=29, then 15 (with longer output)
1953 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1954 'a----------------------------.' +
1955 'b----------------------------.' +
1956 'cde--------------------------.' +
1957 'abcdefghijabcde.' +
1958 'a.b------------.' +
1959 '.c.------------.' +
1960 'd.e------------.' +
1961 'k--------------.' +
1962 'l--------------.' +
1963 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001964 ]
1965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001967 # Try a few one-shot test cases.
1968 for input, eof, output in self.test_cases:
1969 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001970 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001971
1972 # Also test an unfinished decode, followed by forcing EOF.
1973 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual(d.decode(b'oiabcd'), '')
1975 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001976
1977class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001978
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001979 def setUp(self):
1980 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1981 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001982 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001983
Guido van Rossumd0712812007-04-11 16:32:43 +00001984 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001985 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001986
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 def test_constructor(self):
1988 r = self.BytesIO(b"\xc3\xa9\n\n")
1989 b = self.BufferedReader(r, 1000)
1990 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001991 t.__init__(b, encoding="latin-1", newline="\r\n")
1992 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001993 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001994 t.__init__(b, encoding="utf-8", line_buffering=True)
1995 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001996 self.assertEqual(t.line_buffering, True)
1997 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 self.assertRaises(TypeError, t.__init__, b, newline=42)
1999 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2000
Nick Coghlana9b15242014-02-04 22:11:18 +10002001 def test_non_text_encoding_codecs_are_rejected(self):
2002 # Ensure the constructor complains if passed a codec that isn't
2003 # marked as a text encoding
2004 # http://bugs.python.org/issue20404
2005 r = self.BytesIO()
2006 b = self.BufferedWriter(r)
2007 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2008 self.TextIOWrapper(b, encoding="hex")
2009
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002010 def test_detach(self):
2011 r = self.BytesIO()
2012 b = self.BufferedWriter(r)
2013 t = self.TextIOWrapper(b)
2014 self.assertIs(t.detach(), b)
2015
2016 t = self.TextIOWrapper(b, encoding="ascii")
2017 t.write("howdy")
2018 self.assertFalse(r.getvalue())
2019 t.detach()
2020 self.assertEqual(r.getvalue(), b"howdy")
2021 self.assertRaises(ValueError, t.detach)
2022
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002023 def test_repr(self):
2024 raw = self.BytesIO("hello".encode("utf-8"))
2025 b = self.BufferedReader(raw)
2026 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002027 modname = self.TextIOWrapper.__module__
2028 self.assertEqual(repr(t),
2029 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2030 raw.name = "dummy"
2031 self.assertEqual(repr(t),
2032 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002033 t.mode = "r"
2034 self.assertEqual(repr(t),
2035 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002036 raw.name = b"dummy"
2037 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002038 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040 def test_line_buffering(self):
2041 r = self.BytesIO()
2042 b = self.BufferedWriter(r, 1000)
2043 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002044 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002045 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002046 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002047 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002048 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002049 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002050
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002051 def test_default_encoding(self):
2052 old_environ = dict(os.environ)
2053 try:
2054 # try to get a user preferred encoding different than the current
2055 # locale encoding to check that TextIOWrapper() uses the current
2056 # locale encoding and not the user preferred encoding
2057 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2058 if key in os.environ:
2059 del os.environ[key]
2060
2061 current_locale_encoding = locale.getpreferredencoding(False)
2062 b = self.BytesIO()
2063 t = self.TextIOWrapper(b)
2064 self.assertEqual(t.encoding, current_locale_encoding)
2065 finally:
2066 os.environ.clear()
2067 os.environ.update(old_environ)
2068
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002069 @support.cpython_only
Serhiy Storchaka78980432013-01-15 01:12:17 +02002070 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002071 # Issue 15989
2072 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002073 b = self.BytesIO()
2074 b.fileno = lambda: _testcapi.INT_MAX + 1
2075 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2076 b.fileno = lambda: _testcapi.UINT_MAX + 1
2077 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002079 def test_encoding(self):
2080 # Check the encoding attribute is always set, and valid
2081 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002082 t = self.TextIOWrapper(b, encoding="utf-8")
2083 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002085 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002086 codecs.lookup(t.encoding)
2087
2088 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002089 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 b = self.BytesIO(b"abc\n\xff\n")
2091 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002092 self.assertRaises(UnicodeError, t.read)
2093 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002094 b = self.BytesIO(b"abc\n\xff\n")
2095 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002096 self.assertRaises(UnicodeError, t.read)
2097 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 b = self.BytesIO(b"abc\n\xff\n")
2099 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002100 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002101 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002102 b = self.BytesIO(b"abc\n\xff\n")
2103 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002104 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002106 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002107 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002108 b = self.BytesIO()
2109 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002110 self.assertRaises(UnicodeError, t.write, "\xff")
2111 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 b = self.BytesIO()
2113 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002114 self.assertRaises(UnicodeError, t.write, "\xff")
2115 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002116 b = self.BytesIO()
2117 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002118 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002119 t.write("abc\xffdef\n")
2120 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002121 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002122 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123 b = self.BytesIO()
2124 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002125 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002126 t.write("abc\xffdef\n")
2127 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002128 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002130 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002131 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2132
2133 tests = [
2134 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002135 [ '', input_lines ],
2136 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2137 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2138 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002139 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002140 encodings = (
2141 'utf-8', 'latin-1',
2142 'utf-16', 'utf-16-le', 'utf-16-be',
2143 'utf-32', 'utf-32-le', 'utf-32-be',
2144 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002145
Guido van Rossum8358db22007-08-18 21:39:55 +00002146 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002147 # character in TextIOWrapper._pending_line.
2148 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002149 # XXX: str.encode() should return bytes
2150 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002151 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002152 for bufsize in range(1, 10):
2153 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2155 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002156 encoding=encoding)
2157 if do_reads:
2158 got_lines = []
2159 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002160 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002161 if c2 == '':
2162 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002164 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002165 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002166 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002167
2168 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(got_line, exp_line)
2170 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002172 def test_newlines_input(self):
2173 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002174 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2175 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002176 (None, normalized.decode("ascii").splitlines(keepends=True)),
2177 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2179 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2180 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002181 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 buf = self.BytesIO(testdata)
2183 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002185 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 def test_newlines_output(self):
2189 testdict = {
2190 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2191 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2192 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2193 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2194 }
2195 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2196 for newline, expected in tests:
2197 buf = self.BytesIO()
2198 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2199 txt.write("AAA\nB")
2200 txt.write("BB\nCCC\n")
2201 txt.write("X\rY\r\nZ")
2202 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002203 self.assertEqual(buf.closed, False)
2204 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205
2206 def test_destructor(self):
2207 l = []
2208 base = self.BytesIO
2209 class MyBytesIO(base):
2210 def close(self):
2211 l.append(self.getvalue())
2212 base.close(self)
2213 b = MyBytesIO()
2214 t = self.TextIOWrapper(b, encoding="ascii")
2215 t.write("abc")
2216 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002217 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002219
2220 def test_override_destructor(self):
2221 record = []
2222 class MyTextIO(self.TextIOWrapper):
2223 def __del__(self):
2224 record.append(1)
2225 try:
2226 f = super().__del__
2227 except AttributeError:
2228 pass
2229 else:
2230 f()
2231 def close(self):
2232 record.append(2)
2233 super().close()
2234 def flush(self):
2235 record.append(3)
2236 super().flush()
2237 b = self.BytesIO()
2238 t = MyTextIO(b, encoding="ascii")
2239 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002240 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002241 self.assertEqual(record, [1, 2, 3])
2242
2243 def test_error_through_destructor(self):
2244 # Test that the exception state is not modified by a destructor,
2245 # even if close() fails.
2246 rawio = self.CloseFailureIO()
2247 def f():
2248 self.TextIOWrapper(rawio).xyzzy
2249 with support.captured_output("stderr") as s:
2250 self.assertRaises(AttributeError, f)
2251 s = s.getvalue().strip()
2252 if s:
2253 # The destructor *may* have printed an unraisable error, check it
2254 self.assertEqual(len(s.splitlines()), 1)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002255 self.assertTrue(s.startswith("Exception OSError: "), s)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002256 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002257
Guido van Rossum9b76da62007-04-11 01:09:03 +00002258 # Systematic tests of the text I/O API
2259
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002260 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002261 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002262 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002264 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002266 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002267 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002268 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002269 self.assertEqual(f.tell(), 0)
2270 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002271 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002272 self.assertEqual(f.seek(0), 0)
2273 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002274 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(f.read(2), "ab")
2276 self.assertEqual(f.read(1), "c")
2277 self.assertEqual(f.read(1), "")
2278 self.assertEqual(f.read(), "")
2279 self.assertEqual(f.tell(), cookie)
2280 self.assertEqual(f.seek(0), 0)
2281 self.assertEqual(f.seek(0, 2), cookie)
2282 self.assertEqual(f.write("def"), 3)
2283 self.assertEqual(f.seek(cookie), cookie)
2284 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002285 if enc.startswith("utf"):
2286 self.multi_line_test(f, enc)
2287 f.close()
2288
2289 def multi_line_test(self, f, enc):
2290 f.seek(0)
2291 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002292 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002293 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002294 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002295 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002296 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002297 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002298 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002299 wlines.append((f.tell(), line))
2300 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002301 f.seek(0)
2302 rlines = []
2303 while True:
2304 pos = f.tell()
2305 line = f.readline()
2306 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002307 break
2308 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002309 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002310
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002311 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002312 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002313 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002314 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002315 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002316 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002317 p2 = f.tell()
2318 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002319 self.assertEqual(f.tell(), p0)
2320 self.assertEqual(f.readline(), "\xff\n")
2321 self.assertEqual(f.tell(), p1)
2322 self.assertEqual(f.readline(), "\xff\n")
2323 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002324 f.seek(0)
2325 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002326 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002327 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002328 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002329 f.close()
2330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 def test_seeking(self):
2332 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002333 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002334 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002335 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002337 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002338 suffix = bytes(u_suffix.encode("utf-8"))
2339 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002340 with self.open(support.TESTFN, "wb") as f:
2341 f.write(line*2)
2342 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2343 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002344 self.assertEqual(s, str(prefix, "ascii"))
2345 self.assertEqual(f.tell(), prefix_size)
2346 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002347
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002349 # Regression test for a specific bug
2350 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002351 with self.open(support.TESTFN, "wb") as f:
2352 f.write(data)
2353 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2354 f._CHUNK_SIZE # Just test that it exists
2355 f._CHUNK_SIZE = 2
2356 f.readline()
2357 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002359 def test_seek_and_tell(self):
2360 #Test seek/tell using the StatefulIncrementalDecoder.
2361 # Make test faster by doing smaller seeks
2362 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002363
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002364 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002365 """Tell/seek to various points within a data stream and ensure
2366 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002368 f.write(data)
2369 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 f = self.open(support.TESTFN, encoding='test_decoder')
2371 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002372 decoded = f.read()
2373 f.close()
2374
Neal Norwitze2b07052008-03-18 19:52:05 +00002375 for i in range(min_pos, len(decoded) + 1): # seek positions
2376 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002378 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002379 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002380 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002381 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002382 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002383 f.close()
2384
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002385 # Enable the test decoder.
2386 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002387
2388 # Run the tests.
2389 try:
2390 # Try each test case.
2391 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002392 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002393
2394 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002395 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2396 offset = CHUNK_SIZE - len(input)//2
2397 prefix = b'.'*offset
2398 # Don't bother seeking into the prefix (takes too long).
2399 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002400 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002401
2402 # Ensure our test decoder won't interfere with subsequent tests.
2403 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002404 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002407 data = "1234567890"
2408 tests = ("utf-16",
2409 "utf-16-le",
2410 "utf-16-be",
2411 "utf-32",
2412 "utf-32-le",
2413 "utf-32-be")
2414 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 buf = self.BytesIO()
2416 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002417 # Check if the BOM is written only once (see issue1753).
2418 f.write(data)
2419 f.write(data)
2420 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002421 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002422 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002423 self.assertEqual(f.read(), data * 2)
2424 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002425
Benjamin Petersona1b49012009-03-31 23:11:32 +00002426 def test_unreadable(self):
2427 class UnReadable(self.BytesIO):
2428 def readable(self):
2429 return False
2430 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002431 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00002432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433 def test_read_one_by_one(self):
2434 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002435 reads = ""
2436 while True:
2437 c = txt.read(1)
2438 if not c:
2439 break
2440 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002441 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002442
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002443 def test_readlines(self):
2444 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2445 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2446 txt.seek(0)
2447 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2448 txt.seek(0)
2449 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2450
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002451 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002452 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002454 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002455 reads = ""
2456 while True:
2457 c = txt.read(128)
2458 if not c:
2459 break
2460 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002462
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002463 def test_writelines(self):
2464 l = ['ab', 'cd', 'ef']
2465 buf = self.BytesIO()
2466 txt = self.TextIOWrapper(buf)
2467 txt.writelines(l)
2468 txt.flush()
2469 self.assertEqual(buf.getvalue(), b'abcdef')
2470
2471 def test_writelines_userlist(self):
2472 l = UserList(['ab', 'cd', 'ef'])
2473 buf = self.BytesIO()
2474 txt = self.TextIOWrapper(buf)
2475 txt.writelines(l)
2476 txt.flush()
2477 self.assertEqual(buf.getvalue(), b'abcdef')
2478
2479 def test_writelines_error(self):
2480 txt = self.TextIOWrapper(self.BytesIO())
2481 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2482 self.assertRaises(TypeError, txt.writelines, None)
2483 self.assertRaises(TypeError, txt.writelines, b'abc')
2484
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002485 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002486 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002487
2488 # read one char at a time
2489 reads = ""
2490 while True:
2491 c = txt.read(1)
2492 if not c:
2493 break
2494 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002495 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002496
2497 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002498 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002499 txt._CHUNK_SIZE = 4
2500
2501 reads = ""
2502 while True:
2503 c = txt.read(4)
2504 if not c:
2505 break
2506 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002507 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002508
2509 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002510 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002511 txt._CHUNK_SIZE = 4
2512
2513 reads = txt.read(4)
2514 reads += txt.read(4)
2515 reads += txt.readline()
2516 reads += txt.readline()
2517 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002519
2520 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002521 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002522 txt._CHUNK_SIZE = 4
2523
2524 reads = txt.read(4)
2525 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002527
2528 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002529 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002530 txt._CHUNK_SIZE = 4
2531
2532 reads = txt.read(4)
2533 pos = txt.tell()
2534 txt.seek(0)
2535 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002536 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002537
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002538 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002539 buffer = self.BytesIO(self.testdata)
2540 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002541
2542 self.assertEqual(buffer.seekable(), txt.seekable())
2543
Antoine Pitroue4501852009-05-14 18:55:55 +00002544 def test_append_bom(self):
2545 # The BOM is not written again when appending to a non-empty file
2546 filename = support.TESTFN
2547 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2548 with self.open(filename, 'w', encoding=charset) as f:
2549 f.write('aaa')
2550 pos = f.tell()
2551 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002552 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002553
2554 with self.open(filename, 'a', encoding=charset) as f:
2555 f.write('xxx')
2556 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002558
2559 def test_seek_bom(self):
2560 # Same test, but when seeking manually
2561 filename = support.TESTFN
2562 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2563 with self.open(filename, 'w', encoding=charset) as f:
2564 f.write('aaa')
2565 pos = f.tell()
2566 with self.open(filename, 'r+', encoding=charset) as f:
2567 f.seek(pos)
2568 f.write('zzz')
2569 f.seek(0)
2570 f.write('bbb')
2571 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002572 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002573
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002574 def test_errors_property(self):
2575 with self.open(support.TESTFN, "w") as f:
2576 self.assertEqual(f.errors, "strict")
2577 with self.open(support.TESTFN, "w", errors="replace") as f:
2578 self.assertEqual(f.errors, "replace")
2579
Brett Cannon31f59292011-02-21 19:29:56 +00002580 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002581 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002582 def test_threads_write(self):
2583 # Issue6750: concurrent writes could duplicate data
2584 event = threading.Event()
2585 with self.open(support.TESTFN, "w", buffering=1) as f:
2586 def run(n):
2587 text = "Thread%03d\n" % n
2588 event.wait()
2589 f.write(text)
2590 threads = [threading.Thread(target=lambda n=x: run(n))
2591 for x in range(20)]
2592 for t in threads:
2593 t.start()
2594 time.sleep(0.02)
2595 event.set()
2596 for t in threads:
2597 t.join()
2598 with self.open(support.TESTFN) as f:
2599 content = f.read()
2600 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002601 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002602
Antoine Pitrou6be88762010-05-03 16:48:20 +00002603 def test_flush_error_on_close(self):
2604 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2605 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002606 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00002607 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002608 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002609 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002610
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03002611 def test_close_error_on_close(self):
2612 buffer = self.BytesIO(self.testdata)
2613 def bad_flush():
2614 raise OSError('flush')
2615 def bad_close():
2616 raise OSError('close')
2617 buffer.close = bad_close
2618 txt = self.TextIOWrapper(buffer, encoding="ascii")
2619 txt.flush = bad_flush
2620 with self.assertRaises(OSError) as err: # exception not swallowed
2621 txt.close()
2622 self.assertEqual(err.exception.args, ('close',))
2623 self.assertIsInstance(err.exception.__context__, OSError)
2624 self.assertEqual(err.exception.__context__.args, ('flush',))
2625 self.assertFalse(txt.closed)
2626
2627 def test_nonnormalized_close_error_on_close(self):
2628 # Issue #21677
2629 buffer = self.BytesIO(self.testdata)
2630 def bad_flush():
2631 raise non_existing_flush
2632 def bad_close():
2633 raise non_existing_close
2634 buffer.close = bad_close
2635 txt = self.TextIOWrapper(buffer, encoding="ascii")
2636 txt.flush = bad_flush
2637 with self.assertRaises(NameError) as err: # exception not swallowed
2638 txt.close()
2639 self.assertIn('non_existing_close', str(err.exception))
2640 self.assertIsInstance(err.exception.__context__, NameError)
2641 self.assertIn('non_existing_flush', str(err.exception.__context__))
2642 self.assertFalse(txt.closed)
2643
Antoine Pitrou6be88762010-05-03 16:48:20 +00002644 def test_multi_close(self):
2645 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2646 txt.close()
2647 txt.close()
2648 txt.close()
2649 self.assertRaises(ValueError, txt.flush)
2650
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002651 def test_unseekable(self):
2652 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2653 self.assertRaises(self.UnsupportedOperation, txt.tell)
2654 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2655
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002656 def test_readonly_attributes(self):
2657 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2658 buf = self.BytesIO(self.testdata)
2659 with self.assertRaises(AttributeError):
2660 txt.buffer = buf
2661
Antoine Pitroue96ec682011-07-23 21:46:35 +02002662 def test_rawio(self):
2663 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2664 # that subprocess.Popen() can have the required unbuffered
2665 # semantics with universal_newlines=True.
2666 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2667 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2668 # Reads
2669 self.assertEqual(txt.read(4), 'abcd')
2670 self.assertEqual(txt.readline(), 'efghi\n')
2671 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2672
2673 def test_rawio_write_through(self):
2674 # Issue #12591: with write_through=True, writes don't need a flush
2675 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2676 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2677 write_through=True)
2678 txt.write('1')
2679 txt.write('23\n4')
2680 txt.write('5')
2681 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2682
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02002683 def test_bufio_write_through(self):
2684 # Issue #21396: write_through=True doesn't force a flush()
2685 # on the underlying binary buffered object.
2686 flush_called, write_called = [], []
2687 class BufferedWriter(self.BufferedWriter):
2688 def flush(self, *args, **kwargs):
2689 flush_called.append(True)
2690 return super().flush(*args, **kwargs)
2691 def write(self, *args, **kwargs):
2692 write_called.append(True)
2693 return super().write(*args, **kwargs)
2694
2695 rawio = self.BytesIO()
2696 data = b"a"
2697 bufio = BufferedWriter(rawio, len(data)*2)
2698 textio = self.TextIOWrapper(bufio, encoding='ascii',
2699 write_through=True)
2700 # write to the buffered io but don't overflow the buffer
2701 text = data.decode('ascii')
2702 textio.write(text)
2703
2704 # buffer.flush is not called with write_through=True
2705 self.assertFalse(flush_called)
2706 # buffer.write *is* called with write_through=True
2707 self.assertTrue(write_called)
2708 self.assertEqual(rawio.getvalue(), b"") # no flush
2709
2710 write_called = [] # reset
2711 textio.write(text * 10) # total content is larger than bufio buffer
2712 self.assertTrue(write_called)
2713 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
2714
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002715 def test_read_nonbytes(self):
2716 # Issue #17106
2717 # Crash when underlying read() returns non-bytes
2718 t = self.TextIOWrapper(self.StringIO('a'))
2719 self.assertRaises(TypeError, t.read, 1)
2720 t = self.TextIOWrapper(self.StringIO('a'))
2721 self.assertRaises(TypeError, t.readline)
2722 t = self.TextIOWrapper(self.StringIO('a'))
2723 self.assertRaises(TypeError, t.read)
2724
2725 def test_illegal_decoder(self):
2726 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10002727 # Bypass the early encoding check added in issue 20404
2728 def _make_illegal_wrapper():
2729 quopri = codecs.lookup("quopri")
2730 quopri._is_text_encoding = True
2731 try:
2732 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2733 newline='\n', encoding="quopri")
2734 finally:
2735 quopri._is_text_encoding = False
2736 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002737 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10002738 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002739 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10002740 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002741 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10002742 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002743 self.assertRaises(TypeError, t.read)
2744
Antoine Pitrou712cb732013-12-21 15:51:54 +01002745 def _check_create_at_shutdown(self, **kwargs):
2746 # Issue #20037: creating a TextIOWrapper at shutdown
2747 # shouldn't crash the interpreter.
2748 iomod = self.io.__name__
2749 code = """if 1:
2750 import codecs
2751 import {iomod} as io
2752
2753 # Avoid looking up codecs at shutdown
2754 codecs.lookup('utf-8')
2755
2756 class C:
2757 def __init__(self):
2758 self.buf = io.BytesIO()
2759 def __del__(self):
2760 io.TextIOWrapper(self.buf, **{kwargs})
2761 print("ok")
2762 c = C()
2763 """.format(iomod=iomod, kwargs=kwargs)
2764 return assert_python_ok("-c", code)
2765
2766 def test_create_at_shutdown_without_encoding(self):
2767 rc, out, err = self._check_create_at_shutdown()
2768 if err:
2769 # Can error out with a RuntimeError if the module state
2770 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10002771 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01002772 else:
2773 self.assertEqual("ok", out.decode().strip())
2774
2775 def test_create_at_shutdown_with_encoding(self):
2776 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
2777 errors='strict')
2778 self.assertFalse(err)
2779 self.assertEqual("ok", out.decode().strip())
2780
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002783 io = io
Nick Coghlana9b15242014-02-04 22:11:18 +10002784 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002785
2786 def test_initialization(self):
2787 r = self.BytesIO(b"\xc3\xa9\n\n")
2788 b = self.BufferedReader(r, 1000)
2789 t = self.TextIOWrapper(b)
2790 self.assertRaises(TypeError, t.__init__, b, newline=42)
2791 self.assertRaises(ValueError, t.read)
2792 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2793 self.assertRaises(ValueError, t.read)
2794
2795 def test_garbage_collection(self):
2796 # C TextIOWrapper objects are collected, and collecting them flushes
2797 # all data to disk.
2798 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02002799 with support.check_warnings(('', ResourceWarning)):
2800 rawio = io.FileIO(support.TESTFN, "wb")
2801 b = self.BufferedWriter(rawio)
2802 t = self.TextIOWrapper(b, encoding="ascii")
2803 t.write("456def")
2804 t.x = t
2805 wr = weakref.ref(t)
2806 del t
2807 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002808 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002809 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810 self.assertEqual(f.read(), b"456def")
2811
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002812 def test_rwpair_cleared_before_textio(self):
2813 # Issue 13070: TextIOWrapper's finalization would crash when called
2814 # after the reference to the underlying BufferedRWPair's writer got
2815 # cleared by the GC.
2816 for i in range(1000):
2817 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2818 t1 = self.TextIOWrapper(b1, encoding="ascii")
2819 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2820 t2 = self.TextIOWrapper(b2, encoding="ascii")
2821 # circular references
2822 t1.buddy = t2
2823 t2.buddy = t1
2824 support.gc_collect()
2825
2826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002827class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01002828 io = pyio
Serhiy Storchakad667d722014-02-10 19:09:19 +02002829 #shutdown_error = "LookupError: unknown encoding: ascii"
2830 shutdown_error = "TypeError: 'NoneType' object is not iterable"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002831
2832
2833class IncrementalNewlineDecoderTest(unittest.TestCase):
2834
2835 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002836 # UTF-8 specific tests for a newline decoder
2837 def _check_decode(b, s, **kwargs):
2838 # We exercise getstate() / setstate() as well as decode()
2839 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002840 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002841 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002842 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002843
Antoine Pitrou180a3362008-12-14 16:36:46 +00002844 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002845
Antoine Pitrou180a3362008-12-14 16:36:46 +00002846 _check_decode(b'\xe8', "")
2847 _check_decode(b'\xa2', "")
2848 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002849
Antoine Pitrou180a3362008-12-14 16:36:46 +00002850 _check_decode(b'\xe8', "")
2851 _check_decode(b'\xa2', "")
2852 _check_decode(b'\x88', "\u8888")
2853
2854 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002855 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2856
Antoine Pitrou180a3362008-12-14 16:36:46 +00002857 decoder.reset()
2858 _check_decode(b'\n', "\n")
2859 _check_decode(b'\r', "")
2860 _check_decode(b'', "\n", final=True)
2861 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002862
Antoine Pitrou180a3362008-12-14 16:36:46 +00002863 _check_decode(b'\r', "")
2864 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002865
Antoine Pitrou180a3362008-12-14 16:36:46 +00002866 _check_decode(b'\r\r\n', "\n\n")
2867 _check_decode(b'\r', "")
2868 _check_decode(b'\r', "\n")
2869 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002870
Antoine Pitrou180a3362008-12-14 16:36:46 +00002871 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2872 _check_decode(b'\xe8\xa2\x88', "\u8888")
2873 _check_decode(b'\n', "\n")
2874 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2875 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002876
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002878 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879 if encoding is not None:
2880 encoder = codecs.getincrementalencoder(encoding)()
2881 def _decode_bytewise(s):
2882 # Decode one byte at a time
2883 for b in encoder.encode(s):
2884 result.append(decoder.decode(bytes([b])))
2885 else:
2886 encoder = None
2887 def _decode_bytewise(s):
2888 # Decode one char at a time
2889 for c in s:
2890 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002891 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002892 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002893 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002894 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002895 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002896 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002898 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002899 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002900 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002901 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002902 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002903 input = "abc"
2904 if encoder is not None:
2905 encoder.reset()
2906 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002907 self.assertEqual(decoder.decode(input), "abc")
2908 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002909
2910 def test_newline_decoder(self):
2911 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002912 # None meaning the IncrementalNewlineDecoder takes unicode input
2913 # rather than bytes input
2914 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002915 'utf-16', 'utf-16-le', 'utf-16-be',
2916 'utf-32', 'utf-32-le', 'utf-32-be',
2917 )
2918 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002919 decoder = enc and codecs.getincrementaldecoder(enc)()
2920 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2921 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002922 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002923 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2924 self.check_newline_decoding_utf8(decoder)
2925
Antoine Pitrou66913e22009-03-06 23:40:56 +00002926 def test_newline_bytes(self):
2927 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2928 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002929 self.assertEqual(dec.newlines, None)
2930 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2931 self.assertEqual(dec.newlines, None)
2932 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2933 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002934 dec = self.IncrementalNewlineDecoder(None, translate=False)
2935 _check(dec)
2936 dec = self.IncrementalNewlineDecoder(None, translate=True)
2937 _check(dec)
2938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002939class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2940 pass
2941
2942class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2943 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002944
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002945
Guido van Rossum01a27522007-03-07 01:00:12 +00002946# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002947
Guido van Rossum5abbf752007-08-27 17:39:33 +00002948class MiscIOTest(unittest.TestCase):
2949
Barry Warsaw40e82462008-11-20 20:14:50 +00002950 def tearDown(self):
2951 support.unlink(support.TESTFN)
2952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002953 def test___all__(self):
2954 for name in self.io.__all__:
2955 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002956 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002957 if name == "open":
2958 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002959 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002960 self.assertTrue(issubclass(obj, Exception), name)
2961 elif not name.startswith("SEEK_"):
2962 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002963
Barry Warsaw40e82462008-11-20 20:14:50 +00002964 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002965 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002966 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002967 f.close()
2968
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02002969 with support.check_warnings(('', DeprecationWarning)):
2970 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002971 self.assertEqual(f.name, support.TESTFN)
2972 self.assertEqual(f.buffer.name, support.TESTFN)
2973 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2974 self.assertEqual(f.mode, "U")
2975 self.assertEqual(f.buffer.mode, "rb")
2976 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002977 f.close()
2978
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002979 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(f.mode, "w+")
2981 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2982 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002985 self.assertEqual(g.mode, "wb")
2986 self.assertEqual(g.raw.mode, "wb")
2987 self.assertEqual(g.name, f.fileno())
2988 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002989 f.close()
2990 g.close()
2991
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002992 def test_io_after_close(self):
2993 for kwargs in [
2994 {"mode": "w"},
2995 {"mode": "wb"},
2996 {"mode": "w", "buffering": 1},
2997 {"mode": "w", "buffering": 2},
2998 {"mode": "wb", "buffering": 0},
2999 {"mode": "r"},
3000 {"mode": "rb"},
3001 {"mode": "r", "buffering": 1},
3002 {"mode": "r", "buffering": 2},
3003 {"mode": "rb", "buffering": 0},
3004 {"mode": "w+"},
3005 {"mode": "w+b"},
3006 {"mode": "w+", "buffering": 1},
3007 {"mode": "w+", "buffering": 2},
3008 {"mode": "w+b", "buffering": 0},
3009 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003010 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003011 f.close()
3012 self.assertRaises(ValueError, f.flush)
3013 self.assertRaises(ValueError, f.fileno)
3014 self.assertRaises(ValueError, f.isatty)
3015 self.assertRaises(ValueError, f.__iter__)
3016 if hasattr(f, "peek"):
3017 self.assertRaises(ValueError, f.peek, 1)
3018 self.assertRaises(ValueError, f.read)
3019 if hasattr(f, "read1"):
3020 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003021 if hasattr(f, "readall"):
3022 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003023 if hasattr(f, "readinto"):
3024 self.assertRaises(ValueError, f.readinto, bytearray(1024))
3025 self.assertRaises(ValueError, f.readline)
3026 self.assertRaises(ValueError, f.readlines)
3027 self.assertRaises(ValueError, f.seek, 0)
3028 self.assertRaises(ValueError, f.tell)
3029 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003030 self.assertRaises(ValueError, f.write,
3031 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003032 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003033 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003034
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003035 def test_blockingioerror(self):
3036 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003037 class C(str):
3038 pass
3039 c = C("")
3040 b = self.BlockingIOError(1, c)
3041 c.b = b
3042 b.c = c
3043 wr = weakref.ref(c)
3044 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003045 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003046 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003047
3048 def test_abcs(self):
3049 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003050 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3051 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3052 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3053 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003054
3055 def _check_abc_inheritance(self, abcmodule):
3056 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003057 self.assertIsInstance(f, abcmodule.IOBase)
3058 self.assertIsInstance(f, abcmodule.RawIOBase)
3059 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3060 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003061 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003062 self.assertIsInstance(f, abcmodule.IOBase)
3063 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3064 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3065 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003066 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003067 self.assertIsInstance(f, abcmodule.IOBase)
3068 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3069 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3070 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003071
3072 def test_abc_inheritance(self):
3073 # Test implementations inherit from their respective ABCs
3074 self._check_abc_inheritance(self)
3075
3076 def test_abc_inheritance_official(self):
3077 # Test implementations inherit from the official ABCs of the
3078 # baseline "io" module.
3079 self._check_abc_inheritance(io)
3080
Antoine Pitroue033e062010-10-29 10:38:18 +00003081 def _check_warn_on_dealloc(self, *args, **kwargs):
3082 f = open(*args, **kwargs)
3083 r = repr(f)
3084 with self.assertWarns(ResourceWarning) as cm:
3085 f = None
3086 support.gc_collect()
3087 self.assertIn(r, str(cm.warning.args[0]))
3088
3089 def test_warn_on_dealloc(self):
3090 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3091 self._check_warn_on_dealloc(support.TESTFN, "wb")
3092 self._check_warn_on_dealloc(support.TESTFN, "w")
3093
3094 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3095 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00003096 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00003097 for fd in fds:
3098 try:
3099 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02003100 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00003101 if e.errno != errno.EBADF:
3102 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00003103 self.addCleanup(cleanup_fds)
3104 r, w = os.pipe()
3105 fds += r, w
3106 self._check_warn_on_dealloc(r, *args, **kwargs)
3107 # When using closefd=False, there's no warning
3108 r, w = os.pipe()
3109 fds += r, w
3110 with warnings.catch_warnings(record=True) as recorded:
3111 open(r, *args, closefd=False, **kwargs)
3112 support.gc_collect()
3113 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00003114
3115 def test_warn_on_dealloc_fd(self):
3116 self._check_warn_on_dealloc_fd("rb", buffering=0)
3117 self._check_warn_on_dealloc_fd("rb")
3118 self._check_warn_on_dealloc_fd("r")
3119
3120
Antoine Pitrou243757e2010-11-05 21:15:39 +00003121 def test_pickling(self):
3122 # Pickling file objects is forbidden
3123 for kwargs in [
3124 {"mode": "w"},
3125 {"mode": "wb"},
3126 {"mode": "wb", "buffering": 0},
3127 {"mode": "r"},
3128 {"mode": "rb"},
3129 {"mode": "rb", "buffering": 0},
3130 {"mode": "w+"},
3131 {"mode": "w+b"},
3132 {"mode": "w+b", "buffering": 0},
3133 ]:
3134 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3135 with self.open(support.TESTFN, **kwargs) as f:
3136 self.assertRaises(TypeError, pickle.dumps, f, protocol)
3137
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003138 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3139 def test_nonblock_pipe_write_bigbuf(self):
3140 self._test_nonblock_pipe_write(16*1024)
3141
3142 @unittest.skipUnless(fcntl, 'fcntl required for this test')
3143 def test_nonblock_pipe_write_smallbuf(self):
3144 self._test_nonblock_pipe_write(1024)
3145
3146 def _set_non_blocking(self, fd):
3147 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3148 self.assertNotEqual(flags, -1)
3149 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3150 self.assertEqual(res, 0)
3151
3152 def _test_nonblock_pipe_write(self, bufsize):
3153 sent = []
3154 received = []
3155 r, w = os.pipe()
3156 self._set_non_blocking(r)
3157 self._set_non_blocking(w)
3158
3159 # To exercise all code paths in the C implementation we need
3160 # to play with buffer sizes. For instance, if we choose a
3161 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3162 # then we will never get a partial write of the buffer.
3163 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3164 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3165
3166 with rf, wf:
3167 for N in 9999, 73, 7574:
3168 try:
3169 i = 0
3170 while True:
3171 msg = bytes([i % 26 + 97]) * N
3172 sent.append(msg)
3173 wf.write(msg)
3174 i += 1
3175
3176 except self.BlockingIOError as e:
3177 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003178 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003179 sent[-1] = sent[-1][:e.characters_written]
3180 received.append(rf.read())
3181 msg = b'BLOCKED'
3182 wf.write(msg)
3183 sent.append(msg)
3184
3185 while True:
3186 try:
3187 wf.flush()
3188 break
3189 except self.BlockingIOError as e:
3190 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01003191 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01003192 self.assertEqual(e.characters_written, 0)
3193 received.append(rf.read())
3194
3195 received += iter(rf.read, None)
3196
3197 sent, received = b''.join(sent), b''.join(received)
3198 self.assertTrue(sent == received)
3199 self.assertTrue(wf.closed)
3200 self.assertTrue(rf.closed)
3201
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003202 def test_create_fail(self):
3203 # 'x' mode fails if file is existing
3204 with self.open(support.TESTFN, 'w'):
3205 pass
3206 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3207
3208 def test_create_writes(self):
3209 # 'x' mode opens for writing
3210 with self.open(support.TESTFN, 'xb') as f:
3211 f.write(b"spam")
3212 with self.open(support.TESTFN, 'rb') as f:
3213 self.assertEqual(b"spam", f.read())
3214
Christian Heimes7b648752012-09-10 14:48:43 +02003215 def test_open_allargs(self):
3216 # there used to be a buffer overflow in the parser for rawmode
3217 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3218
3219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003220class CMiscIOTest(MiscIOTest):
3221 io = io
3222
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003223 def test_readinto_buffer_overflow(self):
3224 # Issue #18025
3225 class BadReader(self.io.BufferedIOBase):
3226 def read(self, n=-1):
3227 return b'x' * 10**6
3228 bufio = BadReader()
3229 b = bytearray(2)
3230 self.assertRaises(ValueError, bufio.readinto, b)
3231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003232class PyMiscIOTest(MiscIOTest):
3233 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003234
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003235
3236@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3237class SignalsTest(unittest.TestCase):
3238
3239 def setUp(self):
3240 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3241
3242 def tearDown(self):
3243 signal.signal(signal.SIGALRM, self.oldalrm)
3244
3245 def alarm_interrupt(self, sig, frame):
3246 1/0
3247
3248 @unittest.skipUnless(threading, 'Threading required for this test.')
3249 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3250 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003251 invokes the signal handler, and bubbles up the exception raised
3252 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003253 read_results = []
3254 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003255 if hasattr(signal, 'pthread_sigmask'):
3256 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003257 s = os.read(r, 1)
3258 read_results.append(s)
3259 t = threading.Thread(target=_read)
3260 t.daemon = True
3261 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003262 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003263 try:
3264 wio = self.io.open(w, **fdopen_kwargs)
3265 t.start()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003266 # Fill the pipe enough that the write will be blocking.
3267 # It will be interrupted by the timer armed above. Since the
3268 # other thread has read one byte, the low-level write will
3269 # return with a successful (partial) result rather than an EINTR.
3270 # The buffered IO layer must check for pending signal
3271 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02003272 signal.alarm(1)
3273 try:
3274 self.assertRaises(ZeroDivisionError,
3275 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
3276 finally:
3277 signal.alarm(0)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003278 t.join()
3279 # We got one byte, get another one and check that it isn't a
3280 # repeat of the first one.
3281 read_results.append(os.read(r, 1))
3282 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3283 finally:
3284 os.close(w)
3285 os.close(r)
3286 # This is deliberate. If we didn't close the file descriptor
3287 # before closing wio, wio would try to flush its internal
3288 # buffer, and block again.
3289 try:
3290 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003291 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003292 if e.errno != errno.EBADF:
3293 raise
3294
3295 def test_interrupted_write_unbuffered(self):
3296 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3297
3298 def test_interrupted_write_buffered(self):
3299 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3300
3301 def test_interrupted_write_text(self):
3302 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3303
Brett Cannon31f59292011-02-21 19:29:56 +00003304 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003305 def check_reentrant_write(self, data, **fdopen_kwargs):
3306 def on_alarm(*args):
3307 # Will be called reentrantly from the same thread
3308 wio.write(data)
3309 1/0
3310 signal.signal(signal.SIGALRM, on_alarm)
3311 r, w = os.pipe()
3312 wio = self.io.open(w, **fdopen_kwargs)
3313 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003314 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003315 # Either the reentrant call to wio.write() fails with RuntimeError,
3316 # or the signal handler raises ZeroDivisionError.
3317 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3318 while 1:
3319 for i in range(100):
3320 wio.write(data)
3321 wio.flush()
3322 # Make sure the buffer doesn't fill up and block further writes
3323 os.read(r, len(data) * 100)
3324 exc = cm.exception
3325 if isinstance(exc, RuntimeError):
3326 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3327 finally:
3328 wio.close()
3329 os.close(r)
3330
3331 def test_reentrant_write_buffered(self):
3332 self.check_reentrant_write(b"xy", mode="wb")
3333
3334 def test_reentrant_write_text(self):
3335 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3336
Antoine Pitrou707ce822011-02-25 21:24:11 +00003337 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3338 """Check that a buffered read, when it gets interrupted (either
3339 returning a partial result or EINTR), properly invokes the signal
3340 handler and retries if the latter returned successfully."""
3341 r, w = os.pipe()
3342 fdopen_kwargs["closefd"] = False
3343 def alarm_handler(sig, frame):
3344 os.write(w, b"bar")
3345 signal.signal(signal.SIGALRM, alarm_handler)
3346 try:
3347 rio = self.io.open(r, **fdopen_kwargs)
3348 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003349 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003350 # Expected behaviour:
3351 # - first raw read() returns partial b"foo"
3352 # - second raw read() returns EINTR
3353 # - third raw read() returns b"bar"
3354 self.assertEqual(decode(rio.read(6)), "foobar")
3355 finally:
3356 rio.close()
3357 os.close(w)
3358 os.close(r)
3359
Antoine Pitrou20db5112011-08-19 20:32:34 +02003360 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003361 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3362 mode="rb")
3363
Antoine Pitrou20db5112011-08-19 20:32:34 +02003364 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003365 self.check_interrupted_read_retry(lambda x: x,
3366 mode="r")
3367
3368 @unittest.skipUnless(threading, 'Threading required for this test.')
3369 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3370 """Check that a buffered write, when it gets interrupted (either
3371 returning a partial result or EINTR), properly invokes the signal
3372 handler and retries if the latter returned successfully."""
3373 select = support.import_module("select")
3374 # A quantity that exceeds the buffer size of an anonymous pipe's
3375 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003376 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003377 r, w = os.pipe()
3378 fdopen_kwargs["closefd"] = False
3379 # We need a separate thread to read from the pipe and allow the
3380 # write() to finish. This thread is started after the SIGALRM is
3381 # received (forcing a first EINTR in write()).
3382 read_results = []
3383 write_finished = False
3384 def _read():
3385 while not write_finished:
3386 while r in select.select([r], [], [], 1.0)[0]:
3387 s = os.read(r, 1024)
3388 read_results.append(s)
3389 t = threading.Thread(target=_read)
3390 t.daemon = True
3391 def alarm1(sig, frame):
3392 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003393 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003394 def alarm2(sig, frame):
3395 t.start()
3396 signal.signal(signal.SIGALRM, alarm1)
3397 try:
3398 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003399 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003400 # Expected behaviour:
3401 # - first raw write() is partial (because of the limited pipe buffer
3402 # and the first alarm)
3403 # - second raw write() returns EINTR (because of the second alarm)
3404 # - subsequent write()s are successful (either partial or complete)
3405 self.assertEqual(N, wio.write(item * N))
3406 wio.flush()
3407 write_finished = True
3408 t.join()
3409 self.assertEqual(N, sum(len(x) for x in read_results))
3410 finally:
3411 write_finished = True
3412 os.close(w)
3413 os.close(r)
3414 # This is deliberate. If we didn't close the file descriptor
3415 # before closing wio, wio would try to flush its internal
3416 # buffer, and could block (in case of failure).
3417 try:
3418 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003419 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00003420 if e.errno != errno.EBADF:
3421 raise
3422
Antoine Pitrou20db5112011-08-19 20:32:34 +02003423 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003424 self.check_interrupted_write_retry(b"x", mode="wb")
3425
Antoine Pitrou20db5112011-08-19 20:32:34 +02003426 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003427 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3428
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003429
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003430class CSignalsTest(SignalsTest):
3431 io = io
3432
3433class PySignalsTest(SignalsTest):
3434 io = pyio
3435
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003436 # Handling reentrancy issues would slow down _pyio even more, so the
3437 # tests are disabled.
3438 test_reentrant_write_buffered = None
3439 test_reentrant_write_text = None
3440
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003441
Ezio Melottidaa42c72013-03-23 16:30:16 +02003442def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003443 tests = (CIOTest, PyIOTest,
3444 CBufferedReaderTest, PyBufferedReaderTest,
3445 CBufferedWriterTest, PyBufferedWriterTest,
3446 CBufferedRWPairTest, PyBufferedRWPairTest,
3447 CBufferedRandomTest, PyBufferedRandomTest,
3448 StatefulIncrementalDecoderTest,
3449 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3450 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003451 CMiscIOTest, PyMiscIOTest,
3452 CSignalsTest, PySignalsTest,
3453 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003454
3455 # Put the namespaces of the IO module we are testing and some useful mock
3456 # classes in the __dict__ of each test.
3457 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003458 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003459 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3460 c_io_ns = {name : getattr(io, name) for name in all_members}
3461 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3462 globs = globals()
3463 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3464 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3465 # Avoid turning open into a bound method.
3466 py_io_ns["open"] = pyio.OpenWrapper
3467 for test in tests:
3468 if test.__name__.startswith("C"):
3469 for name, obj in c_io_ns.items():
3470 setattr(test, name, obj)
3471 elif test.__name__.startswith("Py"):
3472 for name, obj in py_io_ns.items():
3473 setattr(test, name, obj)
3474
Ezio Melottidaa42c72013-03-23 16:30:16 +02003475 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3476 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003477
3478if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003479 unittest.main()