blob: e843cacae15ad7c6525b8db6eb61d9cecb2befeb [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka441d30f2013-01-19 12:26:26 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
168 raise IOError
169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 support.requires(
425 'largefile',
426 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 with self.open(support.TESTFN, "w+b", 0) as f:
428 self.large_file_ops(f)
429 with self.open(support.TESTFN, "w+b") as f:
430 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000431
432 def test_with_open(self):
433 for bufsize in (0, 1, 100):
434 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000435 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000436 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000437 self.assertEqual(f.closed, True)
438 f = None
439 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000441 1/0
442 except ZeroDivisionError:
443 self.assertEqual(f.closed, True)
444 else:
445 self.fail("1/0 didn't raise an exception")
446
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447 # issue 5008
448 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457
Guido van Rossum87429772007-04-10 21:06:59 +0000458 def test_destructor(self):
459 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000461 def __del__(self):
462 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000463 try:
464 f = super().__del__
465 except AttributeError:
466 pass
467 else:
468 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def close(self):
470 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000471 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000472 def flush(self):
473 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000474 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000475 with support.check_warnings(('', ResourceWarning)):
476 f = MyFileIO(support.TESTFN, "wb")
477 f.write(b"xxx")
478 del f
479 support.gc_collect()
480 self.assertEqual(record, [1, 2, 3])
481 with self.open(support.TESTFN, "rb") as f:
482 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000483
484 def _check_base_destructor(self, base):
485 record = []
486 class MyIO(base):
487 def __init__(self):
488 # This exercises the availability of attributes on object
489 # destruction.
490 # (in the C version, close() is called by the tp_dealloc
491 # function, not by __del__)
492 self.on_del = 1
493 self.on_close = 2
494 self.on_flush = 3
495 def __del__(self):
496 record.append(self.on_del)
497 try:
498 f = super().__del__
499 except AttributeError:
500 pass
501 else:
502 f()
503 def close(self):
504 record.append(self.on_close)
505 super().close()
506 def flush(self):
507 record.append(self.on_flush)
508 super().flush()
509 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000512 self.assertEqual(record, [1, 2, 3])
513
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 def test_IOBase_destructor(self):
515 self._check_base_destructor(self.IOBase)
516
517 def test_RawIOBase_destructor(self):
518 self._check_base_destructor(self.RawIOBase)
519
520 def test_BufferedIOBase_destructor(self):
521 self._check_base_destructor(self.BufferedIOBase)
522
523 def test_TextIOBase_destructor(self):
524 self._check_base_destructor(self.TextIOBase)
525
Guido van Rossum87429772007-04-10 21:06:59 +0000526 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000527 with self.open(support.TESTFN, "wb") as f:
528 f.write(b"xxx")
529 with self.open(support.TESTFN, "rb") as f:
530 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000531
Guido van Rossumd4103952007-04-12 05:44:49 +0000532 def test_array_writes(self):
533 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000534 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000535 with self.open(support.TESTFN, "wb", 0) as f:
536 self.assertEqual(f.write(a), n)
537 with self.open(support.TESTFN, "wb") as f:
538 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000539
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000542 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000544 def test_read_closed(self):
545 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000546 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000547 with self.open(support.TESTFN, "r") as f:
548 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000549 self.assertEqual(file.read(), "egg\n")
550 file.seek(0)
551 file.close()
552 self.assertRaises(ValueError, file.read)
553
554 def test_no_closefd_with_filename(self):
555 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000557
558 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 self.assertEqual(file.buffer.raw.closefd, False)
565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 def test_garbage_collection(self):
567 # FileIO objects are collected, and collecting them flushes
568 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000569 with support.check_warnings(('', ResourceWarning)):
570 f = self.FileIO(support.TESTFN, "wb")
571 f.write(b"abcxxx")
572 f.f = f
573 wr = weakref.ref(f)
574 del f
575 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000576 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000577 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000578 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000579
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 def test_unbounded_file(self):
581 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
582 zero = "/dev/zero"
583 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000587 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
595
Antoine Pitrou6be88762010-05-03 16:48:20 +0000596 def test_flush_error_on_close(self):
597 f = self.open(support.TESTFN, "wb", buffering=0)
598 def bad_flush():
599 raise IOError()
600 f.flush = bad_flush
601 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600602 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000603
604 def test_multi_close(self):
605 f = self.open(support.TESTFN, "wb", buffering=0)
606 f.close()
607 f.close()
608 f.close()
609 self.assertRaises(ValueError, f.flush)
610
Antoine Pitrou328ec742010-09-14 18:37:24 +0000611 def test_RawIOBase_read(self):
612 # Exercise the default RawIOBase.read() implementation (which calls
613 # readinto() internally).
614 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
615 self.assertEqual(rawio.read(2), b"ab")
616 self.assertEqual(rawio.read(2), b"c")
617 self.assertEqual(rawio.read(2), b"d")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"ef")
620 self.assertEqual(rawio.read(2), b"g")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"")
623
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400624 def test_types_have_dict(self):
625 test = (
626 self.IOBase(),
627 self.RawIOBase(),
628 self.TextIOBase(),
629 self.StringIO(),
630 self.BytesIO()
631 )
632 for obj in test:
633 self.assertTrue(hasattr(obj, "__dict__"))
634
Ross Lagerwall59142db2011-10-31 20:34:46 +0200635 def test_opener(self):
636 with self.open(support.TESTFN, "w") as f:
637 f.write("egg\n")
638 fd = os.open(support.TESTFN, os.O_RDONLY)
639 def opener(path, flags):
640 return fd
641 with self.open("non-existent", "r", opener=opener) as f:
642 self.assertEqual(f.read(), "egg\n")
643
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200644 def test_fileio_closefd(self):
645 # Issue #4841
646 with self.open(__file__, 'rb') as f1, \
647 self.open(__file__, 'rb') as f2:
648 fileio = self.FileIO(f1.fileno(), closefd=False)
649 # .__init__() must not close f1
650 fileio.__init__(f2.fileno(), closefd=False)
651 f1.readline()
652 # .close() must not close f2
653 fileio.close()
654 f2.readline()
655
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200658
659 def test_IOBase_finalize(self):
660 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
661 # class which inherits IOBase and an object of this class are caught
662 # in a reference cycle and close() is already in the method cache.
663 class MyIO(self.IOBase):
664 def close(self):
665 pass
666
667 # create an instance to populate the method cache
668 MyIO()
669 obj = MyIO()
670 obj.obj = obj
671 wr = weakref.ref(obj)
672 del MyIO
673 del obj
674 support.gc_collect()
675 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677class PyIOTest(IOTest):
678 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000679
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681class CommonBufferedTests:
682 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
683
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000684 def test_detach(self):
685 raw = self.MockRawIO()
686 buf = self.tp(raw)
687 self.assertIs(buf.detach(), raw)
688 self.assertRaises(ValueError, buf.detach)
689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_fileno(self):
691 rawio = self.MockRawIO()
692 bufio = self.tp(rawio)
693
Ezio Melottib3aedd42010-11-20 19:04:17 +0000694 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695
Zachary Ware9fe6d862013-12-08 00:20:35 -0600696 @unittest.skip('test having existential crisis')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_no_fileno(self):
698 # XXX will we always have fileno() function? If so, kill
699 # this test. Else, write it.
700 pass
701
702 def test_invalid_args(self):
703 rawio = self.MockRawIO()
704 bufio = self.tp(rawio)
705 # Invalid whence
706 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200707 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708
709 def test_override_destructor(self):
710 tp = self.tp
711 record = []
712 class MyBufferedIO(tp):
713 def __del__(self):
714 record.append(1)
715 try:
716 f = super().__del__
717 except AttributeError:
718 pass
719 else:
720 f()
721 def close(self):
722 record.append(2)
723 super().close()
724 def flush(self):
725 record.append(3)
726 super().flush()
727 rawio = self.MockRawIO()
728 bufio = MyBufferedIO(rawio)
729 writable = bufio.writable()
730 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000731 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 if writable:
733 self.assertEqual(record, [1, 2, 3])
734 else:
735 self.assertEqual(record, [1, 2])
736
737 def test_context_manager(self):
738 # Test usability as a context manager
739 rawio = self.MockRawIO()
740 bufio = self.tp(rawio)
741 def _with():
742 with bufio:
743 pass
744 _with()
745 # bufio should now be closed, and using it a second time should raise
746 # a ValueError.
747 self.assertRaises(ValueError, _with)
748
749 def test_error_through_destructor(self):
750 # Test that the exception state is not modified by a destructor,
751 # even if close() fails.
752 rawio = self.CloseFailureIO()
753 def f():
754 self.tp(rawio).xyzzy
755 with support.captured_output("stderr") as s:
756 self.assertRaises(AttributeError, f)
757 s = s.getvalue().strip()
758 if s:
759 # The destructor *may* have printed an unraisable error, check it
760 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000761 self.assertTrue(s.startswith("Exception IOError: "), s)
762 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000763
Antoine Pitrou716c4442009-05-23 19:04:03 +0000764 def test_repr(self):
765 raw = self.MockRawIO()
766 b = self.tp(raw)
767 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
768 self.assertEqual(repr(b), "<%s>" % clsname)
769 raw.name = "dummy"
770 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
771 raw.name = b"dummy"
772 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
773
Antoine Pitrou6be88762010-05-03 16:48:20 +0000774 def test_flush_error_on_close(self):
775 raw = self.MockRawIO()
776 def bad_flush():
777 raise IOError()
778 raw.flush = bad_flush
779 b = self.tp(raw)
780 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600781 self.assertTrue(b.closed)
782
783 def test_close_error_on_close(self):
784 raw = self.MockRawIO()
785 def bad_flush():
786 raise IOError('flush')
787 def bad_close():
788 raise IOError('close')
789 raw.close = bad_close
790 b = self.tp(raw)
791 b.flush = bad_flush
792 with self.assertRaises(IOError) as err: # exception not swallowed
793 b.close()
794 self.assertEqual(err.exception.args, ('close',))
795 self.assertEqual(err.exception.__context__.args, ('flush',))
796 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797
798 def test_multi_close(self):
799 raw = self.MockRawIO()
800 b = self.tp(raw)
801 b.close()
802 b.close()
803 b.close()
804 self.assertRaises(ValueError, b.flush)
805
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000806 def test_unseekable(self):
807 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
808 self.assertRaises(self.UnsupportedOperation, bufio.tell)
809 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
810
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000811 def test_readonly_attributes(self):
812 raw = self.MockRawIO()
813 buf = self.tp(raw)
814 x = self.MockRawIO()
815 with self.assertRaises(AttributeError):
816 buf.raw = x
817
Guido van Rossum78892e42007-04-06 17:31:18 +0000818
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200819class SizeofTest:
820
821 @support.cpython_only
822 def test_sizeof(self):
823 bufsize1 = 4096
824 bufsize2 = 8192
825 rawio = self.MockRawIO()
826 bufio = self.tp(rawio, buffer_size=bufsize1)
827 size = sys.getsizeof(bufio) - bufsize1
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize2)
830 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
831
832
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000833class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
834 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000835
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000836 def test_constructor(self):
837 rawio = self.MockRawIO([b"abc"])
838 bufio = self.tp(rawio)
839 bufio.__init__(rawio)
840 bufio.__init__(rawio, buffer_size=1024)
841 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000842 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
844 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
845 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
846 rawio = self.MockRawIO([b"abc"])
847 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000848 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000849
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000850 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000851 for arg in (None, 7):
852 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
853 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000854 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 # Invalid args
856 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_read1(self):
859 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
860 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000861 self.assertEqual(b"a", bufio.read(1))
862 self.assertEqual(b"b", bufio.read1(1))
863 self.assertEqual(rawio._reads, 1)
864 self.assertEqual(b"c", bufio.read1(100))
865 self.assertEqual(rawio._reads, 1)
866 self.assertEqual(b"d", bufio.read1(100))
867 self.assertEqual(rawio._reads, 2)
868 self.assertEqual(b"efg", bufio.read1(100))
869 self.assertEqual(rawio._reads, 3)
870 self.assertEqual(b"", bufio.read1(100))
871 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 # Invalid args
873 self.assertRaises(ValueError, bufio.read1, -1)
874
875 def test_readinto(self):
876 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
877 bufio = self.tp(rawio)
878 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(bufio.readinto(b), 2)
880 self.assertEqual(b, b"ab")
881 self.assertEqual(bufio.readinto(b), 2)
882 self.assertEqual(b, b"cd")
883 self.assertEqual(bufio.readinto(b), 2)
884 self.assertEqual(b, b"ef")
885 self.assertEqual(bufio.readinto(b), 1)
886 self.assertEqual(b, b"gf")
887 self.assertEqual(bufio.readinto(b), 0)
888 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200889 rawio = self.MockRawIO((b"abc", None))
890 bufio = self.tp(rawio)
891 self.assertEqual(bufio.readinto(b), 2)
892 self.assertEqual(b, b"ab")
893 self.assertEqual(bufio.readinto(b), 1)
894 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000896 def test_readlines(self):
897 def bufio():
898 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
899 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000900 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
901 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
902 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000905 data = b"abcdefghi"
906 dlen = len(data)
907
908 tests = [
909 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
910 [ 100, [ 3, 3, 3], [ dlen ] ],
911 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
912 ]
913
914 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 rawio = self.MockFileIO(data)
916 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000917 pos = 0
918 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000919 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000920 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000922 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000925 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
927 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000928 self.assertEqual(b"abcd", bufio.read(6))
929 self.assertEqual(b"e", bufio.read(1))
930 self.assertEqual(b"fg", bufio.read())
931 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200932 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000933 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000934
Victor Stinnera80987f2011-05-25 22:47:16 +0200935 rawio = self.MockRawIO((b"a", None, None))
936 self.assertEqual(b"a", rawio.readall())
937 self.assertIsNone(rawio.readall())
938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def test_read_past_eof(self):
940 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000942
Ezio Melottib3aedd42010-11-20 19:04:17 +0000943 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945 def test_read_all(self):
946 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
947 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000948
Ezio Melottib3aedd42010-11-20 19:04:17 +0000949 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000950
Victor Stinner45df8202010-04-28 22:31:17 +0000951 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000952 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000953 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000954 try:
955 # Write out many bytes with exactly the same number of 0's,
956 # 1's... 255's. This will help us check that concurrent reading
957 # doesn't duplicate or forget contents.
958 N = 1000
959 l = list(range(256)) * N
960 random.shuffle(l)
961 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000962 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000963 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000964 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000966 errors = []
967 results = []
968 def f():
969 try:
970 # Intra-buffer read then buffer-flushing read
971 for n in cycle([1, 19]):
972 s = bufio.read(n)
973 if not s:
974 break
975 # list.append() is atomic
976 results.append(s)
977 except Exception as e:
978 errors.append(e)
979 raise
980 threads = [threading.Thread(target=f) for x in range(20)]
981 for t in threads:
982 t.start()
983 time.sleep(0.02) # yield
984 for t in threads:
985 t.join()
986 self.assertFalse(errors,
987 "the following exceptions were caught: %r" % errors)
988 s = b''.join(results)
989 for i in range(256):
990 c = bytes(bytearray([i]))
991 self.assertEqual(s.count(c), N)
992 finally:
993 support.unlink(support.TESTFN)
994
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200995 def test_unseekable(self):
996 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
997 self.assertRaises(self.UnsupportedOperation, bufio.tell)
998 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
999 bufio.read(1)
1000 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1001 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003 def test_misbehaved_io(self):
1004 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1005 bufio = self.tp(rawio)
1006 self.assertRaises(IOError, bufio.seek, 0)
1007 self.assertRaises(IOError, bufio.tell)
1008
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001009 def test_no_extraneous_read(self):
1010 # Issue #9550; when the raw IO object has satisfied the read request,
1011 # we should not issue any additional reads, otherwise it may block
1012 # (e.g. socket).
1013 bufsize = 16
1014 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1015 rawio = self.MockRawIO([b"x" * n])
1016 bufio = self.tp(rawio, bufsize)
1017 self.assertEqual(bufio.read(n), b"x" * n)
1018 # Simple case: one raw read is enough to satisfy the request.
1019 self.assertEqual(rawio._extraneous_reads, 0,
1020 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1021 # A more complex case where two raw reads are needed to satisfy
1022 # the request.
1023 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1024 bufio = self.tp(rawio, bufsize)
1025 self.assertEqual(bufio.read(n), b"x" * n)
1026 self.assertEqual(rawio._extraneous_reads, 0,
1027 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1028
1029
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001030class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 tp = io.BufferedReader
1032
1033 def test_constructor(self):
1034 BufferedReaderTest.test_constructor(self)
1035 # The allocation can succeed on 32-bit builds, e.g. with more
1036 # than 2GB RAM and a 64-bit kernel.
1037 if sys.maxsize > 0x7FFFFFFF:
1038 rawio = self.MockRawIO()
1039 bufio = self.tp(rawio)
1040 self.assertRaises((OverflowError, MemoryError, ValueError),
1041 bufio.__init__, rawio, sys.maxsize)
1042
1043 def test_initialization(self):
1044 rawio = self.MockRawIO([b"abc"])
1045 bufio = self.tp(rawio)
1046 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1047 self.assertRaises(ValueError, bufio.read)
1048 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1049 self.assertRaises(ValueError, bufio.read)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1051 self.assertRaises(ValueError, bufio.read)
1052
1053 def test_misbehaved_io_read(self):
1054 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1055 bufio = self.tp(rawio)
1056 # _pyio.BufferedReader seems to implement reading different, so that
1057 # checking this is not so easy.
1058 self.assertRaises(IOError, bufio.read, 10)
1059
1060 def test_garbage_collection(self):
1061 # C BufferedReader objects are collected.
1062 # The Python version has __del__, so it ends into gc.garbage instead
1063 rawio = self.FileIO(support.TESTFN, "w+b")
1064 f = self.tp(rawio)
1065 f.f = f
1066 wr = weakref.ref(f)
1067 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001068 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001069 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070
R David Murray67bfe802013-02-23 21:51:05 -05001071 def test_args_error(self):
1072 # Issue #17275
1073 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1074 self.tp(io.BytesIO(), 1024, 1024, 1024)
1075
1076
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077class PyBufferedReaderTest(BufferedReaderTest):
1078 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001079
Guido van Rossuma9e20242007-03-08 00:43:48 +00001080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1082 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 def test_constructor(self):
1085 rawio = self.MockRawIO()
1086 bufio = self.tp(rawio)
1087 bufio.__init__(rawio)
1088 bufio.__init__(rawio, buffer_size=1024)
1089 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001090 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 bufio.flush()
1092 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1093 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1094 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1095 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001098 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001100 def test_detach_flush(self):
1101 raw = self.MockRawIO()
1102 buf = self.tp(raw)
1103 buf.write(b"howdy!")
1104 self.assertFalse(raw._write_stack)
1105 buf.detach()
1106 self.assertEqual(raw._write_stack, [b"howdy!"])
1107
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001109 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110 writer = self.MockRawIO()
1111 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001112 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001113 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001115 def test_write_overflow(self):
1116 writer = self.MockRawIO()
1117 bufio = self.tp(writer, 8)
1118 contents = b"abcdefghijklmnop"
1119 for n in range(0, len(contents), 3):
1120 bufio.write(contents[n:n+3])
1121 flushed = b"".join(writer._write_stack)
1122 # At least (total - 8) bytes were implicitly flushed, perhaps more
1123 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001124 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 def check_writes(self, intermediate_func):
1127 # Lots of writes, test the flushed output is as expected.
1128 contents = bytes(range(256)) * 1000
1129 n = 0
1130 writer = self.MockRawIO()
1131 bufio = self.tp(writer, 13)
1132 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1133 def gen_sizes():
1134 for size in count(1):
1135 for i in range(15):
1136 yield size
1137 sizes = gen_sizes()
1138 while n < len(contents):
1139 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001140 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001141 intermediate_func(bufio)
1142 n += size
1143 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001144 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_writes(self):
1147 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149 def test_writes_and_flushes(self):
1150 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001151
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152 def test_writes_and_seeks(self):
1153 def _seekabs(bufio):
1154 pos = bufio.tell()
1155 bufio.seek(pos + 1, 0)
1156 bufio.seek(pos - 1, 0)
1157 bufio.seek(pos, 0)
1158 self.check_writes(_seekabs)
1159 def _seekrel(bufio):
1160 pos = bufio.seek(0, 1)
1161 bufio.seek(+1, 1)
1162 bufio.seek(-1, 1)
1163 bufio.seek(pos, 0)
1164 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166 def test_writes_and_truncates(self):
1167 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001169 def test_write_non_blocking(self):
1170 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001171 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001172
Ezio Melottib3aedd42010-11-20 19:04:17 +00001173 self.assertEqual(bufio.write(b"abcd"), 4)
1174 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 # 1 byte will be written, the rest will be buffered
1176 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001177 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001178
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1180 raw.block_on(b"0")
1181 try:
1182 bufio.write(b"opqrwxyz0123456789")
1183 except self.BlockingIOError as e:
1184 written = e.characters_written
1185 else:
1186 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001187 self.assertEqual(written, 16)
1188 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001190
Ezio Melottib3aedd42010-11-20 19:04:17 +00001191 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001192 s = raw.pop_written()
1193 # Previously buffered bytes were flushed
1194 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 def test_write_and_rewind(self):
1197 raw = io.BytesIO()
1198 bufio = self.tp(raw, 4)
1199 self.assertEqual(bufio.write(b"abcdef"), 6)
1200 self.assertEqual(bufio.tell(), 6)
1201 bufio.seek(0, 0)
1202 self.assertEqual(bufio.write(b"XY"), 2)
1203 bufio.seek(6, 0)
1204 self.assertEqual(raw.getvalue(), b"XYcdef")
1205 self.assertEqual(bufio.write(b"123456"), 6)
1206 bufio.flush()
1207 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 def test_flush(self):
1210 writer = self.MockRawIO()
1211 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001212 bufio.write(b"abc")
1213 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001214 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001215
Antoine Pitrou131a4892012-10-16 22:57:11 +02001216 def test_writelines(self):
1217 l = [b'ab', b'cd', b'ef']
1218 writer = self.MockRawIO()
1219 bufio = self.tp(writer, 8)
1220 bufio.writelines(l)
1221 bufio.flush()
1222 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1223
1224 def test_writelines_userlist(self):
1225 l = UserList([b'ab', b'cd', b'ef'])
1226 writer = self.MockRawIO()
1227 bufio = self.tp(writer, 8)
1228 bufio.writelines(l)
1229 bufio.flush()
1230 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1231
1232 def test_writelines_error(self):
1233 writer = self.MockRawIO()
1234 bufio = self.tp(writer, 8)
1235 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1236 self.assertRaises(TypeError, bufio.writelines, None)
1237 self.assertRaises(TypeError, bufio.writelines, 'abc')
1238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_destructor(self):
1240 writer = self.MockRawIO()
1241 bufio = self.tp(writer, 8)
1242 bufio.write(b"abc")
1243 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001244 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001245 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246
1247 def test_truncate(self):
1248 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001249 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 bufio = self.tp(raw, 8)
1251 bufio.write(b"abcdef")
1252 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001253 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001254 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001255 self.assertEqual(f.read(), b"abc")
1256
Victor Stinner45df8202010-04-28 22:31:17 +00001257 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001258 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001260 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261 # Write out many bytes from many threads and test they were
1262 # all flushed.
1263 N = 1000
1264 contents = bytes(range(256)) * N
1265 sizes = cycle([1, 19])
1266 n = 0
1267 queue = deque()
1268 while n < len(contents):
1269 size = next(sizes)
1270 queue.append(contents[n:n+size])
1271 n += size
1272 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001273 # We use a real file object because it allows us to
1274 # exercise situations where the GIL is released before
1275 # writing the buffer to the raw streams. This is in addition
1276 # to concurrency issues due to switching threads in the middle
1277 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001278 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001280 errors = []
1281 def f():
1282 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001283 while True:
1284 try:
1285 s = queue.popleft()
1286 except IndexError:
1287 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001288 bufio.write(s)
1289 except Exception as e:
1290 errors.append(e)
1291 raise
1292 threads = [threading.Thread(target=f) for x in range(20)]
1293 for t in threads:
1294 t.start()
1295 time.sleep(0.02) # yield
1296 for t in threads:
1297 t.join()
1298 self.assertFalse(errors,
1299 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001301 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302 s = f.read()
1303 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001304 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001305 finally:
1306 support.unlink(support.TESTFN)
1307
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308 def test_misbehaved_io(self):
1309 rawio = self.MisbehavedRawIO()
1310 bufio = self.tp(rawio, 5)
1311 self.assertRaises(IOError, bufio.seek, 0)
1312 self.assertRaises(IOError, bufio.tell)
1313 self.assertRaises(IOError, bufio.write, b"abcdef")
1314
Florent Xicluna109d5732012-07-07 17:03:22 +02001315 def test_max_buffer_size_removal(self):
1316 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001317 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001318
Benjamin Peterson68623612012-12-20 11:53:11 -06001319 def test_write_error_on_close(self):
1320 raw = self.MockRawIO()
1321 def bad_write(b):
1322 raise IOError()
1323 raw.write = bad_write
1324 b = self.tp(raw)
1325 b.write(b'spam')
1326 self.assertRaises(IOError, b.close) # exception not swallowed
1327 self.assertTrue(b.closed)
1328
Benjamin Peterson59406a92009-03-26 17:10:29 +00001329
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001330class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001331 tp = io.BufferedWriter
1332
1333 def test_constructor(self):
1334 BufferedWriterTest.test_constructor(self)
1335 # The allocation can succeed on 32-bit builds, e.g. with more
1336 # than 2GB RAM and a 64-bit kernel.
1337 if sys.maxsize > 0x7FFFFFFF:
1338 rawio = self.MockRawIO()
1339 bufio = self.tp(rawio)
1340 self.assertRaises((OverflowError, MemoryError, ValueError),
1341 bufio.__init__, rawio, sys.maxsize)
1342
1343 def test_initialization(self):
1344 rawio = self.MockRawIO()
1345 bufio = self.tp(rawio)
1346 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1347 self.assertRaises(ValueError, bufio.write, b"def")
1348 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1349 self.assertRaises(ValueError, bufio.write, b"def")
1350 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1351 self.assertRaises(ValueError, bufio.write, b"def")
1352
1353 def test_garbage_collection(self):
1354 # C BufferedWriter objects are collected, and collecting them flushes
1355 # all data to disk.
1356 # The Python version has __del__, so it ends into gc.garbage instead
1357 rawio = self.FileIO(support.TESTFN, "w+b")
1358 f = self.tp(rawio)
1359 f.write(b"123xxx")
1360 f.x = f
1361 wr = weakref.ref(f)
1362 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001363 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001364 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001365 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001366 self.assertEqual(f.read(), b"123xxx")
1367
R David Murray67bfe802013-02-23 21:51:05 -05001368 def test_args_error(self):
1369 # Issue #17275
1370 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1371 self.tp(io.BytesIO(), 1024, 1024, 1024)
1372
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001373
1374class PyBufferedWriterTest(BufferedWriterTest):
1375 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001376
Guido van Rossum01a27522007-03-07 01:00:12 +00001377class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001378
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001379 def test_constructor(self):
1380 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001381 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001382
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001383 def test_detach(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1385 self.assertRaises(self.UnsupportedOperation, pair.detach)
1386
Florent Xicluna109d5732012-07-07 17:03:22 +02001387 def test_constructor_max_buffer_size_removal(self):
1388 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001389 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001390
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001391 def test_constructor_with_not_readable(self):
1392 class NotReadable(MockRawIO):
1393 def readable(self):
1394 return False
1395
1396 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1397
1398 def test_constructor_with_not_writeable(self):
1399 class NotWriteable(MockRawIO):
1400 def writable(self):
1401 return False
1402
1403 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1404
1405 def test_read(self):
1406 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1407
1408 self.assertEqual(pair.read(3), b"abc")
1409 self.assertEqual(pair.read(1), b"d")
1410 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001411 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1412 self.assertEqual(pair.read(None), b"abc")
1413
1414 def test_readlines(self):
1415 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1416 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1417 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1418 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001419
1420 def test_read1(self):
1421 # .read1() is delegated to the underlying reader object, so this test
1422 # can be shallow.
1423 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1424
1425 self.assertEqual(pair.read1(3), b"abc")
1426
1427 def test_readinto(self):
1428 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1429
1430 data = bytearray(5)
1431 self.assertEqual(pair.readinto(data), 5)
1432 self.assertEqual(data, b"abcde")
1433
1434 def test_write(self):
1435 w = self.MockRawIO()
1436 pair = self.tp(self.MockRawIO(), w)
1437
1438 pair.write(b"abc")
1439 pair.flush()
1440 pair.write(b"def")
1441 pair.flush()
1442 self.assertEqual(w._write_stack, [b"abc", b"def"])
1443
1444 def test_peek(self):
1445 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1446
1447 self.assertTrue(pair.peek(3).startswith(b"abc"))
1448 self.assertEqual(pair.read(3), b"abc")
1449
1450 def test_readable(self):
1451 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1452 self.assertTrue(pair.readable())
1453
1454 def test_writeable(self):
1455 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1456 self.assertTrue(pair.writable())
1457
1458 def test_seekable(self):
1459 # BufferedRWPairs are never seekable, even if their readers and writers
1460 # are.
1461 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1462 self.assertFalse(pair.seekable())
1463
1464 # .flush() is delegated to the underlying writer object and has been
1465 # tested in the test_write method.
1466
1467 def test_close_and_closed(self):
1468 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1469 self.assertFalse(pair.closed)
1470 pair.close()
1471 self.assertTrue(pair.closed)
1472
1473 def test_isatty(self):
1474 class SelectableIsAtty(MockRawIO):
1475 def __init__(self, isatty):
1476 MockRawIO.__init__(self)
1477 self._isatty = isatty
1478
1479 def isatty(self):
1480 return self._isatty
1481
1482 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1483 self.assertFalse(pair.isatty())
1484
1485 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1486 self.assertTrue(pair.isatty())
1487
1488 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1489 self.assertTrue(pair.isatty())
1490
1491 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1492 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494class CBufferedRWPairTest(BufferedRWPairTest):
1495 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001496
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497class PyBufferedRWPairTest(BufferedRWPairTest):
1498 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500
1501class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1502 read_mode = "rb+"
1503 write_mode = "wb+"
1504
1505 def test_constructor(self):
1506 BufferedReaderTest.test_constructor(self)
1507 BufferedWriterTest.test_constructor(self)
1508
1509 def test_read_and_write(self):
1510 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001511 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001512
1513 self.assertEqual(b"as", rw.read(2))
1514 rw.write(b"ddd")
1515 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001516 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001517 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001518 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 def test_seek_and_tell(self):
1521 raw = self.BytesIO(b"asdfghjkl")
1522 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001523
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(b"as", rw.read(2))
1525 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001526 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001527 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001528
Antoine Pitroue05565e2011-08-20 14:39:23 +02001529 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001530 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001531 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001532 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001533 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001534 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001535 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001536 self.assertEqual(7, rw.tell())
1537 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001538 rw.flush()
1539 self.assertEqual(b"asdf123fl", raw.getvalue())
1540
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001541 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543 def check_flush_and_read(self, read_func):
1544 raw = self.BytesIO(b"abcdefghi")
1545 bufio = self.tp(raw)
1546
Ezio Melottib3aedd42010-11-20 19:04:17 +00001547 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001549 self.assertEqual(b"ef", read_func(bufio, 2))
1550 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(6, bufio.tell())
1553 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 raw.seek(0, 0)
1555 raw.write(b"XYZ")
1556 # flush() resets the read buffer
1557 bufio.flush()
1558 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001559 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560
1561 def test_flush_and_read(self):
1562 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1563
1564 def test_flush_and_readinto(self):
1565 def _readinto(bufio, n=-1):
1566 b = bytearray(n if n >= 0 else 9999)
1567 n = bufio.readinto(b)
1568 return bytes(b[:n])
1569 self.check_flush_and_read(_readinto)
1570
1571 def test_flush_and_peek(self):
1572 def _peek(bufio, n=-1):
1573 # This relies on the fact that the buffer can contain the whole
1574 # raw stream, otherwise peek() can return less.
1575 b = bufio.peek(n)
1576 if n != -1:
1577 b = b[:n]
1578 bufio.seek(len(b), 1)
1579 return b
1580 self.check_flush_and_read(_peek)
1581
1582 def test_flush_and_write(self):
1583 raw = self.BytesIO(b"abcdefghi")
1584 bufio = self.tp(raw)
1585
1586 bufio.write(b"123")
1587 bufio.flush()
1588 bufio.write(b"45")
1589 bufio.flush()
1590 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001591 self.assertEqual(b"12345fghi", raw.getvalue())
1592 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593
1594 def test_threads(self):
1595 BufferedReaderTest.test_threads(self)
1596 BufferedWriterTest.test_threads(self)
1597
1598 def test_writes_and_peek(self):
1599 def _peek(bufio):
1600 bufio.peek(1)
1601 self.check_writes(_peek)
1602 def _peek(bufio):
1603 pos = bufio.tell()
1604 bufio.seek(-1, 1)
1605 bufio.peek(1)
1606 bufio.seek(pos, 0)
1607 self.check_writes(_peek)
1608
1609 def test_writes_and_reads(self):
1610 def _read(bufio):
1611 bufio.seek(-1, 1)
1612 bufio.read(1)
1613 self.check_writes(_read)
1614
1615 def test_writes_and_read1s(self):
1616 def _read1(bufio):
1617 bufio.seek(-1, 1)
1618 bufio.read1(1)
1619 self.check_writes(_read1)
1620
1621 def test_writes_and_readintos(self):
1622 def _read(bufio):
1623 bufio.seek(-1, 1)
1624 bufio.readinto(bytearray(1))
1625 self.check_writes(_read)
1626
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001627 def test_write_after_readahead(self):
1628 # Issue #6629: writing after the buffer was filled by readahead should
1629 # first rewind the raw stream.
1630 for overwrite_size in [1, 5]:
1631 raw = self.BytesIO(b"A" * 10)
1632 bufio = self.tp(raw, 4)
1633 # Trigger readahead
1634 self.assertEqual(bufio.read(1), b"A")
1635 self.assertEqual(bufio.tell(), 1)
1636 # Overwriting should rewind the raw stream if it needs so
1637 bufio.write(b"B" * overwrite_size)
1638 self.assertEqual(bufio.tell(), overwrite_size + 1)
1639 # If the write size was smaller than the buffer size, flush() and
1640 # check that rewind happens.
1641 bufio.flush()
1642 self.assertEqual(bufio.tell(), overwrite_size + 1)
1643 s = raw.getvalue()
1644 self.assertEqual(s,
1645 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1646
Antoine Pitrou7c404892011-05-13 00:13:33 +02001647 def test_write_rewind_write(self):
1648 # Various combinations of reading / writing / seeking backwards / writing again
1649 def mutate(bufio, pos1, pos2):
1650 assert pos2 >= pos1
1651 # Fill the buffer
1652 bufio.seek(pos1)
1653 bufio.read(pos2 - pos1)
1654 bufio.write(b'\x02')
1655 # This writes earlier than the previous write, but still inside
1656 # the buffer.
1657 bufio.seek(pos1)
1658 bufio.write(b'\x01')
1659
1660 b = b"\x80\x81\x82\x83\x84"
1661 for i in range(0, len(b)):
1662 for j in range(i, len(b)):
1663 raw = self.BytesIO(b)
1664 bufio = self.tp(raw, 100)
1665 mutate(bufio, i, j)
1666 bufio.flush()
1667 expected = bytearray(b)
1668 expected[j] = 2
1669 expected[i] = 1
1670 self.assertEqual(raw.getvalue(), expected,
1671 "failed result for i=%d, j=%d" % (i, j))
1672
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001673 def test_truncate_after_read_or_write(self):
1674 raw = self.BytesIO(b"A" * 10)
1675 bufio = self.tp(raw, 100)
1676 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1677 self.assertEqual(bufio.truncate(), 2)
1678 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1679 self.assertEqual(bufio.truncate(), 4)
1680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 def test_misbehaved_io(self):
1682 BufferedReaderTest.test_misbehaved_io(self)
1683 BufferedWriterTest.test_misbehaved_io(self)
1684
Antoine Pitroue05565e2011-08-20 14:39:23 +02001685 def test_interleaved_read_write(self):
1686 # Test for issue #12213
1687 with self.BytesIO(b'abcdefgh') as raw:
1688 with self.tp(raw, 100) as f:
1689 f.write(b"1")
1690 self.assertEqual(f.read(1), b'b')
1691 f.write(b'2')
1692 self.assertEqual(f.read1(1), b'd')
1693 f.write(b'3')
1694 buf = bytearray(1)
1695 f.readinto(buf)
1696 self.assertEqual(buf, b'f')
1697 f.write(b'4')
1698 self.assertEqual(f.peek(1), b'h')
1699 f.flush()
1700 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1701
1702 with self.BytesIO(b'abc') as raw:
1703 with self.tp(raw, 100) as f:
1704 self.assertEqual(f.read(1), b'a')
1705 f.write(b"2")
1706 self.assertEqual(f.read(1), b'c')
1707 f.flush()
1708 self.assertEqual(raw.getvalue(), b'a2c')
1709
1710 def test_interleaved_readline_write(self):
1711 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1712 with self.tp(raw) as f:
1713 f.write(b'1')
1714 self.assertEqual(f.readline(), b'b\n')
1715 f.write(b'2')
1716 self.assertEqual(f.readline(), b'def\n')
1717 f.write(b'3')
1718 self.assertEqual(f.readline(), b'\n')
1719 f.flush()
1720 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1721
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001722 # You can't construct a BufferedRandom over a non-seekable stream.
1723 test_unseekable = None
1724
R David Murray67bfe802013-02-23 21:51:05 -05001725
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001726class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 tp = io.BufferedRandom
1728
1729 def test_constructor(self):
1730 BufferedRandomTest.test_constructor(self)
1731 # The allocation can succeed on 32-bit builds, e.g. with more
1732 # than 2GB RAM and a 64-bit kernel.
1733 if sys.maxsize > 0x7FFFFFFF:
1734 rawio = self.MockRawIO()
1735 bufio = self.tp(rawio)
1736 self.assertRaises((OverflowError, MemoryError, ValueError),
1737 bufio.__init__, rawio, sys.maxsize)
1738
1739 def test_garbage_collection(self):
1740 CBufferedReaderTest.test_garbage_collection(self)
1741 CBufferedWriterTest.test_garbage_collection(self)
1742
R David Murray67bfe802013-02-23 21:51:05 -05001743 def test_args_error(self):
1744 # Issue #17275
1745 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1746 self.tp(io.BytesIO(), 1024, 1024, 1024)
1747
1748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001749class PyBufferedRandomTest(BufferedRandomTest):
1750 tp = pyio.BufferedRandom
1751
1752
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001753# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1754# properties:
1755# - A single output character can correspond to many bytes of input.
1756# - The number of input bytes to complete the character can be
1757# undetermined until the last input byte is received.
1758# - The number of input bytes can vary depending on previous input.
1759# - A single input byte can correspond to many characters of output.
1760# - The number of output characters can be undetermined until the
1761# last input byte is received.
1762# - The number of output characters can vary depending on previous input.
1763
1764class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1765 """
1766 For testing seek/tell behavior with a stateful, buffering decoder.
1767
1768 Input is a sequence of words. Words may be fixed-length (length set
1769 by input) or variable-length (period-terminated). In variable-length
1770 mode, extra periods are ignored. Possible words are:
1771 - 'i' followed by a number sets the input length, I (maximum 99).
1772 When I is set to 0, words are space-terminated.
1773 - 'o' followed by a number sets the output length, O (maximum 99).
1774 - Any other word is converted into a word followed by a period on
1775 the output. The output word consists of the input word truncated
1776 or padded out with hyphens to make its length equal to O. If O
1777 is 0, the word is output verbatim without truncating or padding.
1778 I and O are initially set to 1. When I changes, any buffered input is
1779 re-scanned according to the new I. EOF also terminates the last word.
1780 """
1781
1782 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001783 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001784 self.reset()
1785
1786 def __repr__(self):
1787 return '<SID %x>' % id(self)
1788
1789 def reset(self):
1790 self.i = 1
1791 self.o = 1
1792 self.buffer = bytearray()
1793
1794 def getstate(self):
1795 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1796 return bytes(self.buffer), i*100 + o
1797
1798 def setstate(self, state):
1799 buffer, io = state
1800 self.buffer = bytearray(buffer)
1801 i, o = divmod(io, 100)
1802 self.i, self.o = i ^ 1, o ^ 1
1803
1804 def decode(self, input, final=False):
1805 output = ''
1806 for b in input:
1807 if self.i == 0: # variable-length, terminated with period
1808 if b == ord('.'):
1809 if self.buffer:
1810 output += self.process_word()
1811 else:
1812 self.buffer.append(b)
1813 else: # fixed-length, terminate after self.i bytes
1814 self.buffer.append(b)
1815 if len(self.buffer) == self.i:
1816 output += self.process_word()
1817 if final and self.buffer: # EOF terminates the last word
1818 output += self.process_word()
1819 return output
1820
1821 def process_word(self):
1822 output = ''
1823 if self.buffer[0] == ord('i'):
1824 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1825 elif self.buffer[0] == ord('o'):
1826 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1827 else:
1828 output = self.buffer.decode('ascii')
1829 if len(output) < self.o:
1830 output += '-'*self.o # pad out with hyphens
1831 if self.o:
1832 output = output[:self.o] # truncate to output length
1833 output += '.'
1834 self.buffer = bytearray()
1835 return output
1836
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001837 codecEnabled = False
1838
1839 @classmethod
1840 def lookupTestDecoder(cls, name):
1841 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001842 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001843 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001844 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001845 incrementalencoder=None,
1846 streamreader=None, streamwriter=None,
1847 incrementaldecoder=cls)
1848
1849# Register the previous decoder for testing.
1850# Disabled by default, tests will enable it.
1851codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1852
1853
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001854class StatefulIncrementalDecoderTest(unittest.TestCase):
1855 """
1856 Make sure the StatefulIncrementalDecoder actually works.
1857 """
1858
1859 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001860 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001861 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001862 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001863 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001864 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001865 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001866 # I=0, O=6 (variable-length input, fixed-length output)
1867 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1868 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001869 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001870 # I=6, O=3 (fixed-length input > fixed-length output)
1871 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1872 # I=0, then 3; O=29, then 15 (with longer output)
1873 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1874 'a----------------------------.' +
1875 'b----------------------------.' +
1876 'cde--------------------------.' +
1877 'abcdefghijabcde.' +
1878 'a.b------------.' +
1879 '.c.------------.' +
1880 'd.e------------.' +
1881 'k--------------.' +
1882 'l--------------.' +
1883 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001884 ]
1885
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001887 # Try a few one-shot test cases.
1888 for input, eof, output in self.test_cases:
1889 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001890 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001891
1892 # Also test an unfinished decode, followed by forcing EOF.
1893 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001894 self.assertEqual(d.decode(b'oiabcd'), '')
1895 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001896
1897class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001898
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001899 def setUp(self):
1900 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1901 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001902 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001903
Guido van Rossumd0712812007-04-11 16:32:43 +00001904 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001905 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 def test_constructor(self):
1908 r = self.BytesIO(b"\xc3\xa9\n\n")
1909 b = self.BufferedReader(r, 1000)
1910 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001911 t.__init__(b, encoding="latin-1", newline="\r\n")
1912 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001913 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001914 t.__init__(b, encoding="utf-8", line_buffering=True)
1915 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001916 self.assertEqual(t.line_buffering, True)
1917 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918 self.assertRaises(TypeError, t.__init__, b, newline=42)
1919 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1920
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001921 def test_detach(self):
1922 r = self.BytesIO()
1923 b = self.BufferedWriter(r)
1924 t = self.TextIOWrapper(b)
1925 self.assertIs(t.detach(), b)
1926
1927 t = self.TextIOWrapper(b, encoding="ascii")
1928 t.write("howdy")
1929 self.assertFalse(r.getvalue())
1930 t.detach()
1931 self.assertEqual(r.getvalue(), b"howdy")
1932 self.assertRaises(ValueError, t.detach)
1933
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001934 def test_repr(self):
1935 raw = self.BytesIO("hello".encode("utf-8"))
1936 b = self.BufferedReader(raw)
1937 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001938 modname = self.TextIOWrapper.__module__
1939 self.assertEqual(repr(t),
1940 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1941 raw.name = "dummy"
1942 self.assertEqual(repr(t),
1943 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001944 t.mode = "r"
1945 self.assertEqual(repr(t),
1946 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001947 raw.name = b"dummy"
1948 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001949 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 def test_line_buffering(self):
1952 r = self.BytesIO()
1953 b = self.BufferedWriter(r, 1000)
1954 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001955 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001956 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001957 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001958 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001959 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001960 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001961
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001962 def test_default_encoding(self):
1963 old_environ = dict(os.environ)
1964 try:
1965 # try to get a user preferred encoding different than the current
1966 # locale encoding to check that TextIOWrapper() uses the current
1967 # locale encoding and not the user preferred encoding
1968 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1969 if key in os.environ:
1970 del os.environ[key]
1971
1972 current_locale_encoding = locale.getpreferredencoding(False)
1973 b = self.BytesIO()
1974 t = self.TextIOWrapper(b)
1975 self.assertEqual(t.encoding, current_locale_encoding)
1976 finally:
1977 os.environ.clear()
1978 os.environ.update(old_environ)
1979
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02001980 # Issue 15989
1981 def test_device_encoding(self):
1982 b = self.BytesIO()
1983 b.fileno = lambda: _testcapi.INT_MAX + 1
1984 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1985 b.fileno = lambda: _testcapi.UINT_MAX + 1
1986 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 def test_encoding(self):
1989 # Check the encoding attribute is always set, and valid
1990 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001991 t = self.TextIOWrapper(b, encoding="utf-8")
1992 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001994 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001995 codecs.lookup(t.encoding)
1996
1997 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001998 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 b = self.BytesIO(b"abc\n\xff\n")
2000 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002001 self.assertRaises(UnicodeError, t.read)
2002 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 b = self.BytesIO(b"abc\n\xff\n")
2004 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002005 self.assertRaises(UnicodeError, t.read)
2006 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 b = self.BytesIO(b"abc\n\xff\n")
2008 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002009 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002010 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 b = self.BytesIO(b"abc\n\xff\n")
2012 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002016 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002017 b = self.BytesIO()
2018 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002019 self.assertRaises(UnicodeError, t.write, "\xff")
2020 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 b = self.BytesIO()
2022 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002023 self.assertRaises(UnicodeError, t.write, "\xff")
2024 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 b = self.BytesIO()
2026 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002027 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002028 t.write("abc\xffdef\n")
2029 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002030 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002031 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 b = self.BytesIO()
2033 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002034 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002035 t.write("abc\xffdef\n")
2036 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002037 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002040 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2041
2042 tests = [
2043 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002044 [ '', input_lines ],
2045 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2046 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2047 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002048 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002049 encodings = (
2050 'utf-8', 'latin-1',
2051 'utf-16', 'utf-16-le', 'utf-16-be',
2052 'utf-32', 'utf-32-le', 'utf-32-be',
2053 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002054
Guido van Rossum8358db22007-08-18 21:39:55 +00002055 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002056 # character in TextIOWrapper._pending_line.
2057 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002058 # XXX: str.encode() should return bytes
2059 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002060 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002061 for bufsize in range(1, 10):
2062 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2064 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002065 encoding=encoding)
2066 if do_reads:
2067 got_lines = []
2068 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002069 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002070 if c2 == '':
2071 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002073 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002074 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002075 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002076
2077 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002078 self.assertEqual(got_line, exp_line)
2079 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081 def test_newlines_input(self):
2082 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002083 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2084 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002085 (None, normalized.decode("ascii").splitlines(keepends=True)),
2086 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2088 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2089 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002090 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 buf = self.BytesIO(testdata)
2092 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002093 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002094 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002095 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002097 def test_newlines_output(self):
2098 testdict = {
2099 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2100 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2101 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2102 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2103 }
2104 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2105 for newline, expected in tests:
2106 buf = self.BytesIO()
2107 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2108 txt.write("AAA\nB")
2109 txt.write("BB\nCCC\n")
2110 txt.write("X\rY\r\nZ")
2111 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual(buf.closed, False)
2113 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114
2115 def test_destructor(self):
2116 l = []
2117 base = self.BytesIO
2118 class MyBytesIO(base):
2119 def close(self):
2120 l.append(self.getvalue())
2121 base.close(self)
2122 b = MyBytesIO()
2123 t = self.TextIOWrapper(b, encoding="ascii")
2124 t.write("abc")
2125 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002126 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002127 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128
2129 def test_override_destructor(self):
2130 record = []
2131 class MyTextIO(self.TextIOWrapper):
2132 def __del__(self):
2133 record.append(1)
2134 try:
2135 f = super().__del__
2136 except AttributeError:
2137 pass
2138 else:
2139 f()
2140 def close(self):
2141 record.append(2)
2142 super().close()
2143 def flush(self):
2144 record.append(3)
2145 super().flush()
2146 b = self.BytesIO()
2147 t = MyTextIO(b, encoding="ascii")
2148 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002149 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002150 self.assertEqual(record, [1, 2, 3])
2151
2152 def test_error_through_destructor(self):
2153 # Test that the exception state is not modified by a destructor,
2154 # even if close() fails.
2155 rawio = self.CloseFailureIO()
2156 def f():
2157 self.TextIOWrapper(rawio).xyzzy
2158 with support.captured_output("stderr") as s:
2159 self.assertRaises(AttributeError, f)
2160 s = s.getvalue().strip()
2161 if s:
2162 # The destructor *may* have printed an unraisable error, check it
2163 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002164 self.assertTrue(s.startswith("Exception IOError: "), s)
2165 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002166
Guido van Rossum9b76da62007-04-11 01:09:03 +00002167 # Systematic tests of the text I/O API
2168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002169 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002170 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002171 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002172 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002173 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002174 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002175 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002177 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002178 self.assertEqual(f.tell(), 0)
2179 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002180 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002181 self.assertEqual(f.seek(0), 0)
2182 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002183 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(f.read(2), "ab")
2185 self.assertEqual(f.read(1), "c")
2186 self.assertEqual(f.read(1), "")
2187 self.assertEqual(f.read(), "")
2188 self.assertEqual(f.tell(), cookie)
2189 self.assertEqual(f.seek(0), 0)
2190 self.assertEqual(f.seek(0, 2), cookie)
2191 self.assertEqual(f.write("def"), 3)
2192 self.assertEqual(f.seek(cookie), cookie)
2193 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002194 if enc.startswith("utf"):
2195 self.multi_line_test(f, enc)
2196 f.close()
2197
2198 def multi_line_test(self, f, enc):
2199 f.seek(0)
2200 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002201 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002202 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002203 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002204 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002205 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002206 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002207 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002208 wlines.append((f.tell(), line))
2209 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002210 f.seek(0)
2211 rlines = []
2212 while True:
2213 pos = f.tell()
2214 line = f.readline()
2215 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002216 break
2217 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002221 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002222 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002223 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002224 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002225 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002226 p2 = f.tell()
2227 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002228 self.assertEqual(f.tell(), p0)
2229 self.assertEqual(f.readline(), "\xff\n")
2230 self.assertEqual(f.tell(), p1)
2231 self.assertEqual(f.readline(), "\xff\n")
2232 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002233 f.seek(0)
2234 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002235 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002236 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002238 f.close()
2239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240 def test_seeking(self):
2241 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002242 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002243 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002244 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002246 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002247 suffix = bytes(u_suffix.encode("utf-8"))
2248 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002249 with self.open(support.TESTFN, "wb") as f:
2250 f.write(line*2)
2251 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2252 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002253 self.assertEqual(s, str(prefix, "ascii"))
2254 self.assertEqual(f.tell(), prefix_size)
2255 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002258 # Regression test for a specific bug
2259 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002260 with self.open(support.TESTFN, "wb") as f:
2261 f.write(data)
2262 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2263 f._CHUNK_SIZE # Just test that it exists
2264 f._CHUNK_SIZE = 2
2265 f.readline()
2266 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 def test_seek_and_tell(self):
2269 #Test seek/tell using the StatefulIncrementalDecoder.
2270 # Make test faster by doing smaller seeks
2271 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002272
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002273 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002274 """Tell/seek to various points within a data stream and ensure
2275 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002276 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002277 f.write(data)
2278 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 f = self.open(support.TESTFN, encoding='test_decoder')
2280 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002281 decoded = f.read()
2282 f.close()
2283
Neal Norwitze2b07052008-03-18 19:52:05 +00002284 for i in range(min_pos, len(decoded) + 1): # seek positions
2285 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002287 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002288 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002289 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002290 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002291 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002292 f.close()
2293
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002294 # Enable the test decoder.
2295 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002296
2297 # Run the tests.
2298 try:
2299 # Try each test case.
2300 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002301 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002302
2303 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002304 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2305 offset = CHUNK_SIZE - len(input)//2
2306 prefix = b'.'*offset
2307 # Don't bother seeking into the prefix (takes too long).
2308 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002309 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002310
2311 # Ensure our test decoder won't interfere with subsequent tests.
2312 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002313 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002314
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002315 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002316 data = "1234567890"
2317 tests = ("utf-16",
2318 "utf-16-le",
2319 "utf-16-be",
2320 "utf-32",
2321 "utf-32-le",
2322 "utf-32-be")
2323 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324 buf = self.BytesIO()
2325 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002326 # Check if the BOM is written only once (see issue1753).
2327 f.write(data)
2328 f.write(data)
2329 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002331 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002332 self.assertEqual(f.read(), data * 2)
2333 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002334
Benjamin Petersona1b49012009-03-31 23:11:32 +00002335 def test_unreadable(self):
2336 class UnReadable(self.BytesIO):
2337 def readable(self):
2338 return False
2339 txt = self.TextIOWrapper(UnReadable())
2340 self.assertRaises(IOError, txt.read)
2341
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002342 def test_read_one_by_one(self):
2343 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002344 reads = ""
2345 while True:
2346 c = txt.read(1)
2347 if not c:
2348 break
2349 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002350 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002351
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002352 def test_readlines(self):
2353 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2354 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2355 txt.seek(0)
2356 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2357 txt.seek(0)
2358 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2359
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002360 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002362 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002363 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002364 reads = ""
2365 while True:
2366 c = txt.read(128)
2367 if not c:
2368 break
2369 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002370 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002371
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002372 def test_writelines(self):
2373 l = ['ab', 'cd', 'ef']
2374 buf = self.BytesIO()
2375 txt = self.TextIOWrapper(buf)
2376 txt.writelines(l)
2377 txt.flush()
2378 self.assertEqual(buf.getvalue(), b'abcdef')
2379
2380 def test_writelines_userlist(self):
2381 l = UserList(['ab', 'cd', 'ef'])
2382 buf = self.BytesIO()
2383 txt = self.TextIOWrapper(buf)
2384 txt.writelines(l)
2385 txt.flush()
2386 self.assertEqual(buf.getvalue(), b'abcdef')
2387
2388 def test_writelines_error(self):
2389 txt = self.TextIOWrapper(self.BytesIO())
2390 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2391 self.assertRaises(TypeError, txt.writelines, None)
2392 self.assertRaises(TypeError, txt.writelines, b'abc')
2393
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002394 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002395 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002396
2397 # read one char at a time
2398 reads = ""
2399 while True:
2400 c = txt.read(1)
2401 if not c:
2402 break
2403 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002404 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002405
2406 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002408 txt._CHUNK_SIZE = 4
2409
2410 reads = ""
2411 while True:
2412 c = txt.read(4)
2413 if not c:
2414 break
2415 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002416 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002417
2418 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002420 txt._CHUNK_SIZE = 4
2421
2422 reads = txt.read(4)
2423 reads += txt.read(4)
2424 reads += txt.readline()
2425 reads += txt.readline()
2426 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002427 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002428
2429 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002430 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002431 txt._CHUNK_SIZE = 4
2432
2433 reads = txt.read(4)
2434 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002435 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002436
2437 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002438 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002439 txt._CHUNK_SIZE = 4
2440
2441 reads = txt.read(4)
2442 pos = txt.tell()
2443 txt.seek(0)
2444 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002446
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002447 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 buffer = self.BytesIO(self.testdata)
2449 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002450
2451 self.assertEqual(buffer.seekable(), txt.seekable())
2452
Antoine Pitroue4501852009-05-14 18:55:55 +00002453 def test_append_bom(self):
2454 # The BOM is not written again when appending to a non-empty file
2455 filename = support.TESTFN
2456 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2457 with self.open(filename, 'w', encoding=charset) as f:
2458 f.write('aaa')
2459 pos = f.tell()
2460 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002462
2463 with self.open(filename, 'a', encoding=charset) as f:
2464 f.write('xxx')
2465 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002466 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002467
2468 def test_seek_bom(self):
2469 # Same test, but when seeking manually
2470 filename = support.TESTFN
2471 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2472 with self.open(filename, 'w', encoding=charset) as f:
2473 f.write('aaa')
2474 pos = f.tell()
2475 with self.open(filename, 'r+', encoding=charset) as f:
2476 f.seek(pos)
2477 f.write('zzz')
2478 f.seek(0)
2479 f.write('bbb')
2480 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002481 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002482
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002483 def test_errors_property(self):
2484 with self.open(support.TESTFN, "w") as f:
2485 self.assertEqual(f.errors, "strict")
2486 with self.open(support.TESTFN, "w", errors="replace") as f:
2487 self.assertEqual(f.errors, "replace")
2488
Brett Cannon31f59292011-02-21 19:29:56 +00002489 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002490 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002491 def test_threads_write(self):
2492 # Issue6750: concurrent writes could duplicate data
2493 event = threading.Event()
2494 with self.open(support.TESTFN, "w", buffering=1) as f:
2495 def run(n):
2496 text = "Thread%03d\n" % n
2497 event.wait()
2498 f.write(text)
2499 threads = [threading.Thread(target=lambda n=x: run(n))
2500 for x in range(20)]
2501 for t in threads:
2502 t.start()
2503 time.sleep(0.02)
2504 event.set()
2505 for t in threads:
2506 t.join()
2507 with self.open(support.TESTFN) as f:
2508 content = f.read()
2509 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002510 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002511
Antoine Pitrou6be88762010-05-03 16:48:20 +00002512 def test_flush_error_on_close(self):
2513 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2514 def bad_flush():
2515 raise IOError()
2516 txt.flush = bad_flush
2517 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002518 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002519
2520 def test_multi_close(self):
2521 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2522 txt.close()
2523 txt.close()
2524 txt.close()
2525 self.assertRaises(ValueError, txt.flush)
2526
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002527 def test_unseekable(self):
2528 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2529 self.assertRaises(self.UnsupportedOperation, txt.tell)
2530 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2531
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002532 def test_readonly_attributes(self):
2533 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2534 buf = self.BytesIO(self.testdata)
2535 with self.assertRaises(AttributeError):
2536 txt.buffer = buf
2537
Antoine Pitroue96ec682011-07-23 21:46:35 +02002538 def test_rawio(self):
2539 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2540 # that subprocess.Popen() can have the required unbuffered
2541 # semantics with universal_newlines=True.
2542 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2543 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2544 # Reads
2545 self.assertEqual(txt.read(4), 'abcd')
2546 self.assertEqual(txt.readline(), 'efghi\n')
2547 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2548
2549 def test_rawio_write_through(self):
2550 # Issue #12591: with write_through=True, writes don't need a flush
2551 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2552 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2553 write_through=True)
2554 txt.write('1')
2555 txt.write('23\n4')
2556 txt.write('5')
2557 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2558
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002559 def test_read_nonbytes(self):
2560 # Issue #17106
2561 # Crash when underlying read() returns non-bytes
2562 t = self.TextIOWrapper(self.StringIO('a'))
2563 self.assertRaises(TypeError, t.read, 1)
2564 t = self.TextIOWrapper(self.StringIO('a'))
2565 self.assertRaises(TypeError, t.readline)
2566 t = self.TextIOWrapper(self.StringIO('a'))
2567 self.assertRaises(TypeError, t.read)
2568
2569 def test_illegal_decoder(self):
2570 # Issue #17106
2571 # Crash when decoder returns non-string
2572 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2573 encoding='quopri_codec')
2574 self.assertRaises(TypeError, t.read, 1)
2575 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2576 encoding='quopri_codec')
2577 self.assertRaises(TypeError, t.readline)
2578 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2579 encoding='quopri_codec')
2580 self.assertRaises(TypeError, t.read)
2581
2582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002583class CTextIOWrapperTest(TextIOWrapperTest):
2584
2585 def test_initialization(self):
2586 r = self.BytesIO(b"\xc3\xa9\n\n")
2587 b = self.BufferedReader(r, 1000)
2588 t = self.TextIOWrapper(b)
2589 self.assertRaises(TypeError, t.__init__, b, newline=42)
2590 self.assertRaises(ValueError, t.read)
2591 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2592 self.assertRaises(ValueError, t.read)
2593
2594 def test_garbage_collection(self):
2595 # C TextIOWrapper objects are collected, and collecting them flushes
2596 # all data to disk.
2597 # The Python version has __del__, so it ends in gc.garbage instead.
2598 rawio = io.FileIO(support.TESTFN, "wb")
2599 b = self.BufferedWriter(rawio)
2600 t = self.TextIOWrapper(b, encoding="ascii")
2601 t.write("456def")
2602 t.x = t
2603 wr = weakref.ref(t)
2604 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002605 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002606 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002607 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002608 self.assertEqual(f.read(), b"456def")
2609
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002610 def test_rwpair_cleared_before_textio(self):
2611 # Issue 13070: TextIOWrapper's finalization would crash when called
2612 # after the reference to the underlying BufferedRWPair's writer got
2613 # cleared by the GC.
2614 for i in range(1000):
2615 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2616 t1 = self.TextIOWrapper(b1, encoding="ascii")
2617 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2618 t2 = self.TextIOWrapper(b2, encoding="ascii")
2619 # circular references
2620 t1.buddy = t2
2621 t2.buddy = t1
2622 support.gc_collect()
2623
2624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002625class PyTextIOWrapperTest(TextIOWrapperTest):
2626 pass
2627
2628
2629class IncrementalNewlineDecoderTest(unittest.TestCase):
2630
2631 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002632 # UTF-8 specific tests for a newline decoder
2633 def _check_decode(b, s, **kwargs):
2634 # We exercise getstate() / setstate() as well as decode()
2635 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002637 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002638 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002639
Antoine Pitrou180a3362008-12-14 16:36:46 +00002640 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002641
Antoine Pitrou180a3362008-12-14 16:36:46 +00002642 _check_decode(b'\xe8', "")
2643 _check_decode(b'\xa2', "")
2644 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002645
Antoine Pitrou180a3362008-12-14 16:36:46 +00002646 _check_decode(b'\xe8', "")
2647 _check_decode(b'\xa2', "")
2648 _check_decode(b'\x88', "\u8888")
2649
2650 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002651 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2652
Antoine Pitrou180a3362008-12-14 16:36:46 +00002653 decoder.reset()
2654 _check_decode(b'\n', "\n")
2655 _check_decode(b'\r', "")
2656 _check_decode(b'', "\n", final=True)
2657 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002658
Antoine Pitrou180a3362008-12-14 16:36:46 +00002659 _check_decode(b'\r', "")
2660 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002661
Antoine Pitrou180a3362008-12-14 16:36:46 +00002662 _check_decode(b'\r\r\n', "\n\n")
2663 _check_decode(b'\r', "")
2664 _check_decode(b'\r', "\n")
2665 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002666
Antoine Pitrou180a3362008-12-14 16:36:46 +00002667 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2668 _check_decode(b'\xe8\xa2\x88', "\u8888")
2669 _check_decode(b'\n', "\n")
2670 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2671 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002673 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002674 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002675 if encoding is not None:
2676 encoder = codecs.getincrementalencoder(encoding)()
2677 def _decode_bytewise(s):
2678 # Decode one byte at a time
2679 for b in encoder.encode(s):
2680 result.append(decoder.decode(bytes([b])))
2681 else:
2682 encoder = None
2683 def _decode_bytewise(s):
2684 # Decode one char at a time
2685 for c in s:
2686 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002687 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002688 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002689 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002690 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002691 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002692 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002693 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002694 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002696 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002697 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002698 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002699 input = "abc"
2700 if encoder is not None:
2701 encoder.reset()
2702 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002703 self.assertEqual(decoder.decode(input), "abc")
2704 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002705
2706 def test_newline_decoder(self):
2707 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002708 # None meaning the IncrementalNewlineDecoder takes unicode input
2709 # rather than bytes input
2710 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002711 'utf-16', 'utf-16-le', 'utf-16-be',
2712 'utf-32', 'utf-32-le', 'utf-32-be',
2713 )
2714 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715 decoder = enc and codecs.getincrementaldecoder(enc)()
2716 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2717 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002718 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2720 self.check_newline_decoding_utf8(decoder)
2721
Antoine Pitrou66913e22009-03-06 23:40:56 +00002722 def test_newline_bytes(self):
2723 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2724 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002725 self.assertEqual(dec.newlines, None)
2726 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2727 self.assertEqual(dec.newlines, None)
2728 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2729 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002730 dec = self.IncrementalNewlineDecoder(None, translate=False)
2731 _check(dec)
2732 dec = self.IncrementalNewlineDecoder(None, translate=True)
2733 _check(dec)
2734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002735class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2736 pass
2737
2738class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2739 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002740
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002741
Guido van Rossum01a27522007-03-07 01:00:12 +00002742# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002743
Guido van Rossum5abbf752007-08-27 17:39:33 +00002744class MiscIOTest(unittest.TestCase):
2745
Barry Warsaw40e82462008-11-20 20:14:50 +00002746 def tearDown(self):
2747 support.unlink(support.TESTFN)
2748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 def test___all__(self):
2750 for name in self.io.__all__:
2751 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002752 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002753 if name == "open":
2754 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002755 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002756 self.assertTrue(issubclass(obj, Exception), name)
2757 elif not name.startswith("SEEK_"):
2758 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002759
Barry Warsaw40e82462008-11-20 20:14:50 +00002760 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002761 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002763 f.close()
2764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(f.name, support.TESTFN)
2767 self.assertEqual(f.buffer.name, support.TESTFN)
2768 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2769 self.assertEqual(f.mode, "U")
2770 self.assertEqual(f.buffer.mode, "rb")
2771 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002772 f.close()
2773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002775 self.assertEqual(f.mode, "w+")
2776 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2777 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002778
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002779 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002780 self.assertEqual(g.mode, "wb")
2781 self.assertEqual(g.raw.mode, "wb")
2782 self.assertEqual(g.name, f.fileno())
2783 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002784 f.close()
2785 g.close()
2786
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002787 def test_io_after_close(self):
2788 for kwargs in [
2789 {"mode": "w"},
2790 {"mode": "wb"},
2791 {"mode": "w", "buffering": 1},
2792 {"mode": "w", "buffering": 2},
2793 {"mode": "wb", "buffering": 0},
2794 {"mode": "r"},
2795 {"mode": "rb"},
2796 {"mode": "r", "buffering": 1},
2797 {"mode": "r", "buffering": 2},
2798 {"mode": "rb", "buffering": 0},
2799 {"mode": "w+"},
2800 {"mode": "w+b"},
2801 {"mode": "w+", "buffering": 1},
2802 {"mode": "w+", "buffering": 2},
2803 {"mode": "w+b", "buffering": 0},
2804 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002805 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002806 f.close()
2807 self.assertRaises(ValueError, f.flush)
2808 self.assertRaises(ValueError, f.fileno)
2809 self.assertRaises(ValueError, f.isatty)
2810 self.assertRaises(ValueError, f.__iter__)
2811 if hasattr(f, "peek"):
2812 self.assertRaises(ValueError, f.peek, 1)
2813 self.assertRaises(ValueError, f.read)
2814 if hasattr(f, "read1"):
2815 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002816 if hasattr(f, "readall"):
2817 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002818 if hasattr(f, "readinto"):
2819 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2820 self.assertRaises(ValueError, f.readline)
2821 self.assertRaises(ValueError, f.readlines)
2822 self.assertRaises(ValueError, f.seek, 0)
2823 self.assertRaises(ValueError, f.tell)
2824 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002825 self.assertRaises(ValueError, f.write,
2826 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002827 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002830 def test_blockingioerror(self):
2831 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 class C(str):
2833 pass
2834 c = C("")
2835 b = self.BlockingIOError(1, c)
2836 c.b = b
2837 b.c = c
2838 wr = weakref.ref(c)
2839 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002840 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002841 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842
2843 def test_abcs(self):
2844 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002845 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2846 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2847 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2848 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002849
2850 def _check_abc_inheritance(self, abcmodule):
2851 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002852 self.assertIsInstance(f, abcmodule.IOBase)
2853 self.assertIsInstance(f, abcmodule.RawIOBase)
2854 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2855 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002856 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002857 self.assertIsInstance(f, abcmodule.IOBase)
2858 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2859 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2860 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002861 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002862 self.assertIsInstance(f, abcmodule.IOBase)
2863 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2864 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2865 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002866
2867 def test_abc_inheritance(self):
2868 # Test implementations inherit from their respective ABCs
2869 self._check_abc_inheritance(self)
2870
2871 def test_abc_inheritance_official(self):
2872 # Test implementations inherit from the official ABCs of the
2873 # baseline "io" module.
2874 self._check_abc_inheritance(io)
2875
Antoine Pitroue033e062010-10-29 10:38:18 +00002876 def _check_warn_on_dealloc(self, *args, **kwargs):
2877 f = open(*args, **kwargs)
2878 r = repr(f)
2879 with self.assertWarns(ResourceWarning) as cm:
2880 f = None
2881 support.gc_collect()
2882 self.assertIn(r, str(cm.warning.args[0]))
2883
2884 def test_warn_on_dealloc(self):
2885 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2886 self._check_warn_on_dealloc(support.TESTFN, "wb")
2887 self._check_warn_on_dealloc(support.TESTFN, "w")
2888
2889 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2890 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002891 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002892 for fd in fds:
2893 try:
2894 os.close(fd)
2895 except EnvironmentError as e:
2896 if e.errno != errno.EBADF:
2897 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002898 self.addCleanup(cleanup_fds)
2899 r, w = os.pipe()
2900 fds += r, w
2901 self._check_warn_on_dealloc(r, *args, **kwargs)
2902 # When using closefd=False, there's no warning
2903 r, w = os.pipe()
2904 fds += r, w
2905 with warnings.catch_warnings(record=True) as recorded:
2906 open(r, *args, closefd=False, **kwargs)
2907 support.gc_collect()
2908 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002909
2910 def test_warn_on_dealloc_fd(self):
2911 self._check_warn_on_dealloc_fd("rb", buffering=0)
2912 self._check_warn_on_dealloc_fd("rb")
2913 self._check_warn_on_dealloc_fd("r")
2914
2915
Antoine Pitrou243757e2010-11-05 21:15:39 +00002916 def test_pickling(self):
2917 # Pickling file objects is forbidden
2918 for kwargs in [
2919 {"mode": "w"},
2920 {"mode": "wb"},
2921 {"mode": "wb", "buffering": 0},
2922 {"mode": "r"},
2923 {"mode": "rb"},
2924 {"mode": "rb", "buffering": 0},
2925 {"mode": "w+"},
2926 {"mode": "w+b"},
2927 {"mode": "w+b", "buffering": 0},
2928 ]:
2929 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2930 with self.open(support.TESTFN, **kwargs) as f:
2931 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2932
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002933 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2934 def test_nonblock_pipe_write_bigbuf(self):
2935 self._test_nonblock_pipe_write(16*1024)
2936
2937 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2938 def test_nonblock_pipe_write_smallbuf(self):
2939 self._test_nonblock_pipe_write(1024)
2940
2941 def _set_non_blocking(self, fd):
2942 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2943 self.assertNotEqual(flags, -1)
2944 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2945 self.assertEqual(res, 0)
2946
2947 def _test_nonblock_pipe_write(self, bufsize):
2948 sent = []
2949 received = []
2950 r, w = os.pipe()
2951 self._set_non_blocking(r)
2952 self._set_non_blocking(w)
2953
2954 # To exercise all code paths in the C implementation we need
2955 # to play with buffer sizes. For instance, if we choose a
2956 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2957 # then we will never get a partial write of the buffer.
2958 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2959 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2960
2961 with rf, wf:
2962 for N in 9999, 73, 7574:
2963 try:
2964 i = 0
2965 while True:
2966 msg = bytes([i % 26 + 97]) * N
2967 sent.append(msg)
2968 wf.write(msg)
2969 i += 1
2970
2971 except self.BlockingIOError as e:
2972 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002973 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002974 sent[-1] = sent[-1][:e.characters_written]
2975 received.append(rf.read())
2976 msg = b'BLOCKED'
2977 wf.write(msg)
2978 sent.append(msg)
2979
2980 while True:
2981 try:
2982 wf.flush()
2983 break
2984 except self.BlockingIOError as e:
2985 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002986 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002987 self.assertEqual(e.characters_written, 0)
2988 received.append(rf.read())
2989
2990 received += iter(rf.read, None)
2991
2992 sent, received = b''.join(sent), b''.join(received)
2993 self.assertTrue(sent == received)
2994 self.assertTrue(wf.closed)
2995 self.assertTrue(rf.closed)
2996
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002997 def test_create_fail(self):
2998 # 'x' mode fails if file is existing
2999 with self.open(support.TESTFN, 'w'):
3000 pass
3001 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3002
3003 def test_create_writes(self):
3004 # 'x' mode opens for writing
3005 with self.open(support.TESTFN, 'xb') as f:
3006 f.write(b"spam")
3007 with self.open(support.TESTFN, 'rb') as f:
3008 self.assertEqual(b"spam", f.read())
3009
Christian Heimes7b648752012-09-10 14:48:43 +02003010 def test_open_allargs(self):
3011 # there used to be a buffer overflow in the parser for rawmode
3012 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3013
3014
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003015class CMiscIOTest(MiscIOTest):
3016 io = io
3017
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003018 def test_readinto_buffer_overflow(self):
3019 # Issue #18025
3020 class BadReader(self.io.BufferedIOBase):
3021 def read(self, n=-1):
3022 return b'x' * 10**6
3023 bufio = BadReader()
3024 b = bytearray(2)
3025 self.assertRaises(ValueError, bufio.readinto, b)
3026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003027class PyMiscIOTest(MiscIOTest):
3028 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003029
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003030
3031@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3032class SignalsTest(unittest.TestCase):
3033
3034 def setUp(self):
3035 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3036
3037 def tearDown(self):
3038 signal.signal(signal.SIGALRM, self.oldalrm)
3039
3040 def alarm_interrupt(self, sig, frame):
3041 1/0
3042
3043 @unittest.skipUnless(threading, 'Threading required for this test.')
3044 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3045 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003046 invokes the signal handler, and bubbles up the exception raised
3047 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003048 read_results = []
3049 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003050 if hasattr(signal, 'pthread_sigmask'):
3051 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003052 s = os.read(r, 1)
3053 read_results.append(s)
3054 t = threading.Thread(target=_read)
3055 t.daemon = True
3056 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003057 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003058 try:
3059 wio = self.io.open(w, **fdopen_kwargs)
3060 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003061 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003062 # Fill the pipe enough that the write will be blocking.
3063 # It will be interrupted by the timer armed above. Since the
3064 # other thread has read one byte, the low-level write will
3065 # return with a successful (partial) result rather than an EINTR.
3066 # The buffered IO layer must check for pending signal
3067 # handlers, which in this case will invoke alarm_interrupt().
3068 self.assertRaises(ZeroDivisionError,
Antoine Pitroue1a16742013-04-24 23:31:38 +02003069 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003070 t.join()
3071 # We got one byte, get another one and check that it isn't a
3072 # repeat of the first one.
3073 read_results.append(os.read(r, 1))
3074 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3075 finally:
3076 os.close(w)
3077 os.close(r)
3078 # This is deliberate. If we didn't close the file descriptor
3079 # before closing wio, wio would try to flush its internal
3080 # buffer, and block again.
3081 try:
3082 wio.close()
3083 except IOError as e:
3084 if e.errno != errno.EBADF:
3085 raise
3086
3087 def test_interrupted_write_unbuffered(self):
3088 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3089
3090 def test_interrupted_write_buffered(self):
3091 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3092
3093 def test_interrupted_write_text(self):
3094 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3095
Brett Cannon31f59292011-02-21 19:29:56 +00003096 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003097 def check_reentrant_write(self, data, **fdopen_kwargs):
3098 def on_alarm(*args):
3099 # Will be called reentrantly from the same thread
3100 wio.write(data)
3101 1/0
3102 signal.signal(signal.SIGALRM, on_alarm)
3103 r, w = os.pipe()
3104 wio = self.io.open(w, **fdopen_kwargs)
3105 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003106 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003107 # Either the reentrant call to wio.write() fails with RuntimeError,
3108 # or the signal handler raises ZeroDivisionError.
3109 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3110 while 1:
3111 for i in range(100):
3112 wio.write(data)
3113 wio.flush()
3114 # Make sure the buffer doesn't fill up and block further writes
3115 os.read(r, len(data) * 100)
3116 exc = cm.exception
3117 if isinstance(exc, RuntimeError):
3118 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3119 finally:
3120 wio.close()
3121 os.close(r)
3122
3123 def test_reentrant_write_buffered(self):
3124 self.check_reentrant_write(b"xy", mode="wb")
3125
3126 def test_reentrant_write_text(self):
3127 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3128
Antoine Pitrou707ce822011-02-25 21:24:11 +00003129 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3130 """Check that a buffered read, when it gets interrupted (either
3131 returning a partial result or EINTR), properly invokes the signal
3132 handler and retries if the latter returned successfully."""
3133 r, w = os.pipe()
3134 fdopen_kwargs["closefd"] = False
3135 def alarm_handler(sig, frame):
3136 os.write(w, b"bar")
3137 signal.signal(signal.SIGALRM, alarm_handler)
3138 try:
3139 rio = self.io.open(r, **fdopen_kwargs)
3140 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003141 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003142 # Expected behaviour:
3143 # - first raw read() returns partial b"foo"
3144 # - second raw read() returns EINTR
3145 # - third raw read() returns b"bar"
3146 self.assertEqual(decode(rio.read(6)), "foobar")
3147 finally:
3148 rio.close()
3149 os.close(w)
3150 os.close(r)
3151
Antoine Pitrou20db5112011-08-19 20:32:34 +02003152 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003153 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3154 mode="rb")
3155
Antoine Pitrou20db5112011-08-19 20:32:34 +02003156 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003157 self.check_interrupted_read_retry(lambda x: x,
3158 mode="r")
3159
3160 @unittest.skipUnless(threading, 'Threading required for this test.')
3161 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3162 """Check that a buffered write, when it gets interrupted (either
3163 returning a partial result or EINTR), properly invokes the signal
3164 handler and retries if the latter returned successfully."""
3165 select = support.import_module("select")
3166 # A quantity that exceeds the buffer size of an anonymous pipe's
3167 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003168 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003169 r, w = os.pipe()
3170 fdopen_kwargs["closefd"] = False
3171 # We need a separate thread to read from the pipe and allow the
3172 # write() to finish. This thread is started after the SIGALRM is
3173 # received (forcing a first EINTR in write()).
3174 read_results = []
3175 write_finished = False
3176 def _read():
3177 while not write_finished:
3178 while r in select.select([r], [], [], 1.0)[0]:
3179 s = os.read(r, 1024)
3180 read_results.append(s)
3181 t = threading.Thread(target=_read)
3182 t.daemon = True
3183 def alarm1(sig, frame):
3184 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003185 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003186 def alarm2(sig, frame):
3187 t.start()
3188 signal.signal(signal.SIGALRM, alarm1)
3189 try:
3190 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003191 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003192 # Expected behaviour:
3193 # - first raw write() is partial (because of the limited pipe buffer
3194 # and the first alarm)
3195 # - second raw write() returns EINTR (because of the second alarm)
3196 # - subsequent write()s are successful (either partial or complete)
3197 self.assertEqual(N, wio.write(item * N))
3198 wio.flush()
3199 write_finished = True
3200 t.join()
3201 self.assertEqual(N, sum(len(x) for x in read_results))
3202 finally:
3203 write_finished = True
3204 os.close(w)
3205 os.close(r)
3206 # This is deliberate. If we didn't close the file descriptor
3207 # before closing wio, wio would try to flush its internal
3208 # buffer, and could block (in case of failure).
3209 try:
3210 wio.close()
3211 except IOError as e:
3212 if e.errno != errno.EBADF:
3213 raise
3214
Antoine Pitrou20db5112011-08-19 20:32:34 +02003215 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003216 self.check_interrupted_write_retry(b"x", mode="wb")
3217
Antoine Pitrou20db5112011-08-19 20:32:34 +02003218 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003219 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3220
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003221
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003222class CSignalsTest(SignalsTest):
3223 io = io
3224
3225class PySignalsTest(SignalsTest):
3226 io = pyio
3227
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003228 # Handling reentrancy issues would slow down _pyio even more, so the
3229 # tests are disabled.
3230 test_reentrant_write_buffered = None
3231 test_reentrant_write_text = None
3232
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003233
Ezio Melottidaa42c72013-03-23 16:30:16 +02003234def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003235 tests = (CIOTest, PyIOTest,
3236 CBufferedReaderTest, PyBufferedReaderTest,
3237 CBufferedWriterTest, PyBufferedWriterTest,
3238 CBufferedRWPairTest, PyBufferedRWPairTest,
3239 CBufferedRandomTest, PyBufferedRandomTest,
3240 StatefulIncrementalDecoderTest,
3241 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3242 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003243 CMiscIOTest, PyMiscIOTest,
3244 CSignalsTest, PySignalsTest,
3245 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003246
3247 # Put the namespaces of the IO module we are testing and some useful mock
3248 # classes in the __dict__ of each test.
3249 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003250 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003251 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3252 c_io_ns = {name : getattr(io, name) for name in all_members}
3253 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3254 globs = globals()
3255 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3256 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3257 # Avoid turning open into a bound method.
3258 py_io_ns["open"] = pyio.OpenWrapper
3259 for test in tests:
3260 if test.__name__.startswith("C"):
3261 for name, obj in c_io_ns.items():
3262 setattr(test, name, obj)
3263 elif test.__name__.startswith("Py"):
3264 for name, obj in py_io_ns.items():
3265 setattr(test, name, obj)
3266
Ezio Melottidaa42c72013-03-23 16:30:16 +02003267 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3268 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003269
3270if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003271 unittest.main()