blob: 9b8920211e32ddfba3ded27ee8b4fb34f3a643e7 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka441d30f2013-01-19 12:26:26 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
168 raise IOError
169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000424 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000425 print("\nTesting large file ops skipped on %s." % sys.platform,
426 file=sys.stderr)
427 print("It requires %d bytes and a long time." % self.LARGE,
428 file=sys.stderr)
429 print("Use 'regrtest.py -u largefile test_io' to run it.",
430 file=sys.stderr)
431 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "w+b", 0) as f:
433 self.large_file_ops(f)
434 with self.open(support.TESTFN, "w+b") as f:
435 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000436
437 def test_with_open(self):
438 for bufsize in (0, 1, 100):
439 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000441 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000442 self.assertEqual(f.closed, True)
443 f = None
444 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000445 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000446 1/0
447 except ZeroDivisionError:
448 self.assertEqual(f.closed, True)
449 else:
450 self.fail("1/0 didn't raise an exception")
451
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 # issue 5008
453 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000459 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000461 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_destructor(self):
464 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000466 def __del__(self):
467 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 try:
469 f = super().__del__
470 except AttributeError:
471 pass
472 else:
473 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def close(self):
475 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000477 def flush(self):
478 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000480 with support.check_warnings(('', ResourceWarning)):
481 f = MyFileIO(support.TESTFN, "wb")
482 f.write(b"xxx")
483 del f
484 support.gc_collect()
485 self.assertEqual(record, [1, 2, 3])
486 with self.open(support.TESTFN, "rb") as f:
487 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488
489 def _check_base_destructor(self, base):
490 record = []
491 class MyIO(base):
492 def __init__(self):
493 # This exercises the availability of attributes on object
494 # destruction.
495 # (in the C version, close() is called by the tp_dealloc
496 # function, not by __del__)
497 self.on_del = 1
498 self.on_close = 2
499 self.on_flush = 3
500 def __del__(self):
501 record.append(self.on_del)
502 try:
503 f = super().__del__
504 except AttributeError:
505 pass
506 else:
507 f()
508 def close(self):
509 record.append(self.on_close)
510 super().close()
511 def flush(self):
512 record.append(self.on_flush)
513 super().flush()
514 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000517 self.assertEqual(record, [1, 2, 3])
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 def test_IOBase_destructor(self):
520 self._check_base_destructor(self.IOBase)
521
522 def test_RawIOBase_destructor(self):
523 self._check_base_destructor(self.RawIOBase)
524
525 def test_BufferedIOBase_destructor(self):
526 self._check_base_destructor(self.BufferedIOBase)
527
528 def test_TextIOBase_destructor(self):
529 self._check_base_destructor(self.TextIOBase)
530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 f.write(b"xxx")
534 with self.open(support.TESTFN, "rb") as f:
535 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossumd4103952007-04-12 05:44:49 +0000537 def test_array_writes(self):
538 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000539 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", 0) as f:
541 self.assertEqual(f.write(a), n)
542 with self.open(support.TESTFN, "wb") as f:
543 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000544
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000547 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 def test_read_closed(self):
550 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 with self.open(support.TESTFN, "r") as f:
553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.read(), "egg\n")
555 file.seek(0)
556 file.close()
557 self.assertRaises(ValueError, file.read)
558
559 def test_no_closefd_with_filename(self):
560 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562
563 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000569 self.assertEqual(file.buffer.raw.closefd, False)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_garbage_collection(self):
572 # FileIO objects are collected, and collecting them flushes
573 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000574 with support.check_warnings(('', ResourceWarning)):
575 f = self.FileIO(support.TESTFN, "wb")
576 f.write(b"abcxxx")
577 f.f = f
578 wr = weakref.ref(f)
579 del f
580 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000581 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000582 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000584
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 def test_unbounded_file(self):
586 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
587 zero = "/dev/zero"
588 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000593 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000599 self.assertRaises(OverflowError, f.read)
600
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def test_flush_error_on_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 def bad_flush():
604 raise IOError()
605 f.flush = bad_flush
606 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600607 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000608
609 def test_multi_close(self):
610 f = self.open(support.TESTFN, "wb", buffering=0)
611 f.close()
612 f.close()
613 f.close()
614 self.assertRaises(ValueError, f.flush)
615
Antoine Pitrou328ec742010-09-14 18:37:24 +0000616 def test_RawIOBase_read(self):
617 # Exercise the default RawIOBase.read() implementation (which calls
618 # readinto() internally).
619 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
620 self.assertEqual(rawio.read(2), b"ab")
621 self.assertEqual(rawio.read(2), b"c")
622 self.assertEqual(rawio.read(2), b"d")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"ef")
625 self.assertEqual(rawio.read(2), b"g")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"")
628
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400629 def test_types_have_dict(self):
630 test = (
631 self.IOBase(),
632 self.RawIOBase(),
633 self.TextIOBase(),
634 self.StringIO(),
635 self.BytesIO()
636 )
637 for obj in test:
638 self.assertTrue(hasattr(obj, "__dict__"))
639
Ross Lagerwall59142db2011-10-31 20:34:46 +0200640 def test_opener(self):
641 with self.open(support.TESTFN, "w") as f:
642 f.write("egg\n")
643 fd = os.open(support.TESTFN, os.O_RDONLY)
644 def opener(path, flags):
645 return fd
646 with self.open("non-existent", "r", opener=opener) as f:
647 self.assertEqual(f.read(), "egg\n")
648
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200649 def test_fileio_closefd(self):
650 # Issue #4841
651 with self.open(__file__, 'rb') as f1, \
652 self.open(__file__, 'rb') as f2:
653 fileio = self.FileIO(f1.fileno(), closefd=False)
654 # .__init__() must not close f1
655 fileio.__init__(f2.fileno(), closefd=False)
656 f1.readline()
657 # .close() must not close f2
658 fileio.close()
659 f2.readline()
660
661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200663
664 def test_IOBase_finalize(self):
665 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
666 # class which inherits IOBase and an object of this class are caught
667 # in a reference cycle and close() is already in the method cache.
668 class MyIO(self.IOBase):
669 def close(self):
670 pass
671
672 # create an instance to populate the method cache
673 MyIO()
674 obj = MyIO()
675 obj.obj = obj
676 wr = weakref.ref(obj)
677 del MyIO
678 del obj
679 support.gc_collect()
680 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class PyIOTest(IOTest):
683 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000684
Guido van Rossuma9e20242007-03-08 00:43:48 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686class CommonBufferedTests:
687 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
688
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000689 def test_detach(self):
690 raw = self.MockRawIO()
691 buf = self.tp(raw)
692 self.assertIs(buf.detach(), raw)
693 self.assertRaises(ValueError, buf.detach)
694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_fileno(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698
Ezio Melottib3aedd42010-11-20 19:04:17 +0000699 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700
701 def test_no_fileno(self):
702 # XXX will we always have fileno() function? If so, kill
703 # this test. Else, write it.
704 pass
705
706 def test_invalid_args(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709 # Invalid whence
710 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200711 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712
713 def test_override_destructor(self):
714 tp = self.tp
715 record = []
716 class MyBufferedIO(tp):
717 def __del__(self):
718 record.append(1)
719 try:
720 f = super().__del__
721 except AttributeError:
722 pass
723 else:
724 f()
725 def close(self):
726 record.append(2)
727 super().close()
728 def flush(self):
729 record.append(3)
730 super().flush()
731 rawio = self.MockRawIO()
732 bufio = MyBufferedIO(rawio)
733 writable = bufio.writable()
734 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000735 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 if writable:
737 self.assertEqual(record, [1, 2, 3])
738 else:
739 self.assertEqual(record, [1, 2])
740
741 def test_context_manager(self):
742 # Test usability as a context manager
743 rawio = self.MockRawIO()
744 bufio = self.tp(rawio)
745 def _with():
746 with bufio:
747 pass
748 _with()
749 # bufio should now be closed, and using it a second time should raise
750 # a ValueError.
751 self.assertRaises(ValueError, _with)
752
753 def test_error_through_destructor(self):
754 # Test that the exception state is not modified by a destructor,
755 # even if close() fails.
756 rawio = self.CloseFailureIO()
757 def f():
758 self.tp(rawio).xyzzy
759 with support.captured_output("stderr") as s:
760 self.assertRaises(AttributeError, f)
761 s = s.getvalue().strip()
762 if s:
763 # The destructor *may* have printed an unraisable error, check it
764 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000765 self.assertTrue(s.startswith("Exception IOError: "), s)
766 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
Antoine Pitrou716c4442009-05-23 19:04:03 +0000768 def test_repr(self):
769 raw = self.MockRawIO()
770 b = self.tp(raw)
771 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
772 self.assertEqual(repr(b), "<%s>" % clsname)
773 raw.name = "dummy"
774 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
775 raw.name = b"dummy"
776 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
777
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 def test_flush_error_on_close(self):
779 raw = self.MockRawIO()
780 def bad_flush():
781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600785 self.assertTrue(b.closed)
786
787 def test_close_error_on_close(self):
788 raw = self.MockRawIO()
789 def bad_flush():
790 raise IOError('flush')
791 def bad_close():
792 raise IOError('close')
793 raw.close = bad_close
794 b = self.tp(raw)
795 b.flush = bad_flush
796 with self.assertRaises(IOError) as err: # exception not swallowed
797 b.close()
798 self.assertEqual(err.exception.args, ('close',))
799 self.assertEqual(err.exception.__context__.args, ('flush',))
800 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 raw = self.MockRawIO()
804 b = self.tp(raw)
805 b.close()
806 b.close()
807 b.close()
808 self.assertRaises(ValueError, b.flush)
809
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000810 def test_unseekable(self):
811 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
812 self.assertRaises(self.UnsupportedOperation, bufio.tell)
813 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
814
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000815 def test_readonly_attributes(self):
816 raw = self.MockRawIO()
817 buf = self.tp(raw)
818 x = self.MockRawIO()
819 with self.assertRaises(AttributeError):
820 buf.raw = x
821
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200823class SizeofTest:
824
825 @support.cpython_only
826 def test_sizeof(self):
827 bufsize1 = 4096
828 bufsize2 = 8192
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize1)
831 size = sys.getsizeof(bufio) - bufsize1
832 rawio = self.MockRawIO()
833 bufio = self.tp(rawio, buffer_size=bufsize2)
834 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
835
836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
838 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_constructor(self):
841 rawio = self.MockRawIO([b"abc"])
842 bufio = self.tp(rawio)
843 bufio.__init__(rawio)
844 bufio.__init__(rawio, buffer_size=1024)
845 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000846 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
849 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
850 rawio = self.MockRawIO([b"abc"])
851 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000852 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000855 for arg in (None, 7):
856 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
857 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 # Invalid args
860 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read1(self):
863 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
864 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000865 self.assertEqual(b"a", bufio.read(1))
866 self.assertEqual(b"b", bufio.read1(1))
867 self.assertEqual(rawio._reads, 1)
868 self.assertEqual(b"c", bufio.read1(100))
869 self.assertEqual(rawio._reads, 1)
870 self.assertEqual(b"d", bufio.read1(100))
871 self.assertEqual(rawio._reads, 2)
872 self.assertEqual(b"efg", bufio.read1(100))
873 self.assertEqual(rawio._reads, 3)
874 self.assertEqual(b"", bufio.read1(100))
875 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 # Invalid args
877 self.assertRaises(ValueError, bufio.read1, -1)
878
879 def test_readinto(self):
880 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
881 bufio = self.tp(rawio)
882 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000883 self.assertEqual(bufio.readinto(b), 2)
884 self.assertEqual(b, b"ab")
885 self.assertEqual(bufio.readinto(b), 2)
886 self.assertEqual(b, b"cd")
887 self.assertEqual(bufio.readinto(b), 2)
888 self.assertEqual(b, b"ef")
889 self.assertEqual(bufio.readinto(b), 1)
890 self.assertEqual(b, b"gf")
891 self.assertEqual(bufio.readinto(b), 0)
892 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200893 rawio = self.MockRawIO((b"abc", None))
894 bufio = self.tp(rawio)
895 self.assertEqual(bufio.readinto(b), 2)
896 self.assertEqual(b, b"ab")
897 self.assertEqual(bufio.readinto(b), 1)
898 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000900 def test_readlines(self):
901 def bufio():
902 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
903 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
905 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
906 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000909 data = b"abcdefghi"
910 dlen = len(data)
911
912 tests = [
913 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
914 [ 100, [ 3, 3, 3], [ dlen ] ],
915 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
916 ]
917
918 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 rawio = self.MockFileIO(data)
920 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000921 pos = 0
922 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000923 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000924 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000926 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000929 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
931 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000932 self.assertEqual(b"abcd", bufio.read(6))
933 self.assertEqual(b"e", bufio.read(1))
934 self.assertEqual(b"fg", bufio.read())
935 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200936 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000938
Victor Stinnera80987f2011-05-25 22:47:16 +0200939 rawio = self.MockRawIO((b"a", None, None))
940 self.assertEqual(b"a", rawio.readall())
941 self.assertIsNone(rawio.readall())
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_read_past_eof(self):
944 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
945 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_read_all(self):
950 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
951 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000952
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Victor Stinner45df8202010-04-28 22:31:17 +0000955 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000956 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000958 try:
959 # Write out many bytes with exactly the same number of 0's,
960 # 1's... 255's. This will help us check that concurrent reading
961 # doesn't duplicate or forget contents.
962 N = 1000
963 l = list(range(256)) * N
964 random.shuffle(l)
965 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000966 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000967 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000968 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000970 errors = []
971 results = []
972 def f():
973 try:
974 # Intra-buffer read then buffer-flushing read
975 for n in cycle([1, 19]):
976 s = bufio.read(n)
977 if not s:
978 break
979 # list.append() is atomic
980 results.append(s)
981 except Exception as e:
982 errors.append(e)
983 raise
984 threads = [threading.Thread(target=f) for x in range(20)]
985 for t in threads:
986 t.start()
987 time.sleep(0.02) # yield
988 for t in threads:
989 t.join()
990 self.assertFalse(errors,
991 "the following exceptions were caught: %r" % errors)
992 s = b''.join(results)
993 for i in range(256):
994 c = bytes(bytearray([i]))
995 self.assertEqual(s.count(c), N)
996 finally:
997 support.unlink(support.TESTFN)
998
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200999 def test_unseekable(self):
1000 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1001 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1002 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1003 bufio.read(1)
1004 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1005 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 def test_misbehaved_io(self):
1008 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1009 bufio = self.tp(rawio)
1010 self.assertRaises(IOError, bufio.seek, 0)
1011 self.assertRaises(IOError, bufio.tell)
1012
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001013 def test_no_extraneous_read(self):
1014 # Issue #9550; when the raw IO object has satisfied the read request,
1015 # we should not issue any additional reads, otherwise it may block
1016 # (e.g. socket).
1017 bufsize = 16
1018 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1019 rawio = self.MockRawIO([b"x" * n])
1020 bufio = self.tp(rawio, bufsize)
1021 self.assertEqual(bufio.read(n), b"x" * n)
1022 # Simple case: one raw read is enough to satisfy the request.
1023 self.assertEqual(rawio._extraneous_reads, 0,
1024 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1025 # A more complex case where two raw reads are needed to satisfy
1026 # the request.
1027 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1028 bufio = self.tp(rawio, bufsize)
1029 self.assertEqual(bufio.read(n), b"x" * n)
1030 self.assertEqual(rawio._extraneous_reads, 0,
1031 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1032
1033
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001034class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 tp = io.BufferedReader
1036
1037 def test_constructor(self):
1038 BufferedReaderTest.test_constructor(self)
1039 # The allocation can succeed on 32-bit builds, e.g. with more
1040 # than 2GB RAM and a 64-bit kernel.
1041 if sys.maxsize > 0x7FFFFFFF:
1042 rawio = self.MockRawIO()
1043 bufio = self.tp(rawio)
1044 self.assertRaises((OverflowError, MemoryError, ValueError),
1045 bufio.__init__, rawio, sys.maxsize)
1046
1047 def test_initialization(self):
1048 rawio = self.MockRawIO([b"abc"])
1049 bufio = self.tp(rawio)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1051 self.assertRaises(ValueError, bufio.read)
1052 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1053 self.assertRaises(ValueError, bufio.read)
1054 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1055 self.assertRaises(ValueError, bufio.read)
1056
1057 def test_misbehaved_io_read(self):
1058 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1059 bufio = self.tp(rawio)
1060 # _pyio.BufferedReader seems to implement reading different, so that
1061 # checking this is not so easy.
1062 self.assertRaises(IOError, bufio.read, 10)
1063
1064 def test_garbage_collection(self):
1065 # C BufferedReader objects are collected.
1066 # The Python version has __del__, so it ends into gc.garbage instead
1067 rawio = self.FileIO(support.TESTFN, "w+b")
1068 f = self.tp(rawio)
1069 f.f = f
1070 wr = weakref.ref(f)
1071 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001072 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001073 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074
R David Murray67bfe802013-02-23 21:51:05 -05001075 def test_args_error(self):
1076 # Issue #17275
1077 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1078 self.tp(io.BytesIO(), 1024, 1024, 1024)
1079
1080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081class PyBufferedReaderTest(BufferedReaderTest):
1082 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001083
Guido van Rossuma9e20242007-03-08 00:43:48 +00001084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1086 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 def test_constructor(self):
1089 rawio = self.MockRawIO()
1090 bufio = self.tp(rawio)
1091 bufio.__init__(rawio)
1092 bufio.__init__(rawio, buffer_size=1024)
1093 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001094 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 bufio.flush()
1096 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1097 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1099 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001100 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001104 def test_detach_flush(self):
1105 raw = self.MockRawIO()
1106 buf = self.tp(raw)
1107 buf.write(b"howdy!")
1108 self.assertFalse(raw._write_stack)
1109 buf.detach()
1110 self.assertEqual(raw._write_stack, [b"howdy!"])
1111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001113 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 writer = self.MockRawIO()
1115 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001116 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001117 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001118
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001119 def test_write_overflow(self):
1120 writer = self.MockRawIO()
1121 bufio = self.tp(writer, 8)
1122 contents = b"abcdefghijklmnop"
1123 for n in range(0, len(contents), 3):
1124 bufio.write(contents[n:n+3])
1125 flushed = b"".join(writer._write_stack)
1126 # At least (total - 8) bytes were implicitly flushed, perhaps more
1127 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001128 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001130 def check_writes(self, intermediate_func):
1131 # Lots of writes, test the flushed output is as expected.
1132 contents = bytes(range(256)) * 1000
1133 n = 0
1134 writer = self.MockRawIO()
1135 bufio = self.tp(writer, 13)
1136 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1137 def gen_sizes():
1138 for size in count(1):
1139 for i in range(15):
1140 yield size
1141 sizes = gen_sizes()
1142 while n < len(contents):
1143 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001144 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001145 intermediate_func(bufio)
1146 n += size
1147 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001148 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 def test_writes(self):
1151 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153 def test_writes_and_flushes(self):
1154 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_writes_and_seeks(self):
1157 def _seekabs(bufio):
1158 pos = bufio.tell()
1159 bufio.seek(pos + 1, 0)
1160 bufio.seek(pos - 1, 0)
1161 bufio.seek(pos, 0)
1162 self.check_writes(_seekabs)
1163 def _seekrel(bufio):
1164 pos = bufio.seek(0, 1)
1165 bufio.seek(+1, 1)
1166 bufio.seek(-1, 1)
1167 bufio.seek(pos, 0)
1168 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 def test_writes_and_truncates(self):
1171 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 def test_write_non_blocking(self):
1174 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001175 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001176
Ezio Melottib3aedd42010-11-20 19:04:17 +00001177 self.assertEqual(bufio.write(b"abcd"), 4)
1178 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 # 1 byte will be written, the rest will be buffered
1180 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001181 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001183 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1184 raw.block_on(b"0")
1185 try:
1186 bufio.write(b"opqrwxyz0123456789")
1187 except self.BlockingIOError as e:
1188 written = e.characters_written
1189 else:
1190 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001191 self.assertEqual(written, 16)
1192 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001194
Ezio Melottib3aedd42010-11-20 19:04:17 +00001195 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001196 s = raw.pop_written()
1197 # Previously buffered bytes were flushed
1198 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001200 def test_write_and_rewind(self):
1201 raw = io.BytesIO()
1202 bufio = self.tp(raw, 4)
1203 self.assertEqual(bufio.write(b"abcdef"), 6)
1204 self.assertEqual(bufio.tell(), 6)
1205 bufio.seek(0, 0)
1206 self.assertEqual(bufio.write(b"XY"), 2)
1207 bufio.seek(6, 0)
1208 self.assertEqual(raw.getvalue(), b"XYcdef")
1209 self.assertEqual(bufio.write(b"123456"), 6)
1210 bufio.flush()
1211 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213 def test_flush(self):
1214 writer = self.MockRawIO()
1215 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001216 bufio.write(b"abc")
1217 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001218 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001219
Antoine Pitrou131a4892012-10-16 22:57:11 +02001220 def test_writelines(self):
1221 l = [b'ab', b'cd', b'ef']
1222 writer = self.MockRawIO()
1223 bufio = self.tp(writer, 8)
1224 bufio.writelines(l)
1225 bufio.flush()
1226 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1227
1228 def test_writelines_userlist(self):
1229 l = UserList([b'ab', b'cd', b'ef'])
1230 writer = self.MockRawIO()
1231 bufio = self.tp(writer, 8)
1232 bufio.writelines(l)
1233 bufio.flush()
1234 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1235
1236 def test_writelines_error(self):
1237 writer = self.MockRawIO()
1238 bufio = self.tp(writer, 8)
1239 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1240 self.assertRaises(TypeError, bufio.writelines, None)
1241 self.assertRaises(TypeError, bufio.writelines, 'abc')
1242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243 def test_destructor(self):
1244 writer = self.MockRawIO()
1245 bufio = self.tp(writer, 8)
1246 bufio.write(b"abc")
1247 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001248 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001249 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250
1251 def test_truncate(self):
1252 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001253 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 bufio = self.tp(raw, 8)
1255 bufio.write(b"abcdef")
1256 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001257 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001258 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 self.assertEqual(f.read(), b"abc")
1260
Victor Stinner45df8202010-04-28 22:31:17 +00001261 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001262 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001264 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 # Write out many bytes from many threads and test they were
1266 # all flushed.
1267 N = 1000
1268 contents = bytes(range(256)) * N
1269 sizes = cycle([1, 19])
1270 n = 0
1271 queue = deque()
1272 while n < len(contents):
1273 size = next(sizes)
1274 queue.append(contents[n:n+size])
1275 n += size
1276 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001277 # We use a real file object because it allows us to
1278 # exercise situations where the GIL is released before
1279 # writing the buffer to the raw streams. This is in addition
1280 # to concurrency issues due to switching threads in the middle
1281 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001282 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001283 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001284 errors = []
1285 def f():
1286 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001287 while True:
1288 try:
1289 s = queue.popleft()
1290 except IndexError:
1291 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001292 bufio.write(s)
1293 except Exception as e:
1294 errors.append(e)
1295 raise
1296 threads = [threading.Thread(target=f) for x in range(20)]
1297 for t in threads:
1298 t.start()
1299 time.sleep(0.02) # yield
1300 for t in threads:
1301 t.join()
1302 self.assertFalse(errors,
1303 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001305 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306 s = f.read()
1307 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001308 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001309 finally:
1310 support.unlink(support.TESTFN)
1311
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001312 def test_misbehaved_io(self):
1313 rawio = self.MisbehavedRawIO()
1314 bufio = self.tp(rawio, 5)
1315 self.assertRaises(IOError, bufio.seek, 0)
1316 self.assertRaises(IOError, bufio.tell)
1317 self.assertRaises(IOError, bufio.write, b"abcdef")
1318
Florent Xicluna109d5732012-07-07 17:03:22 +02001319 def test_max_buffer_size_removal(self):
1320 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001321 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001322
Benjamin Peterson68623612012-12-20 11:53:11 -06001323 def test_write_error_on_close(self):
1324 raw = self.MockRawIO()
1325 def bad_write(b):
1326 raise IOError()
1327 raw.write = bad_write
1328 b = self.tp(raw)
1329 b.write(b'spam')
1330 self.assertRaises(IOError, b.close) # exception not swallowed
1331 self.assertTrue(b.closed)
1332
Benjamin Peterson59406a92009-03-26 17:10:29 +00001333
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001334class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001335 tp = io.BufferedWriter
1336
1337 def test_constructor(self):
1338 BufferedWriterTest.test_constructor(self)
1339 # The allocation can succeed on 32-bit builds, e.g. with more
1340 # than 2GB RAM and a 64-bit kernel.
1341 if sys.maxsize > 0x7FFFFFFF:
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises((OverflowError, MemoryError, ValueError),
1345 bufio.__init__, rawio, sys.maxsize)
1346
1347 def test_initialization(self):
1348 rawio = self.MockRawIO()
1349 bufio = self.tp(rawio)
1350 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1351 self.assertRaises(ValueError, bufio.write, b"def")
1352 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1353 self.assertRaises(ValueError, bufio.write, b"def")
1354 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1355 self.assertRaises(ValueError, bufio.write, b"def")
1356
1357 def test_garbage_collection(self):
1358 # C BufferedWriter objects are collected, and collecting them flushes
1359 # all data to disk.
1360 # The Python version has __del__, so it ends into gc.garbage instead
1361 rawio = self.FileIO(support.TESTFN, "w+b")
1362 f = self.tp(rawio)
1363 f.write(b"123xxx")
1364 f.x = f
1365 wr = weakref.ref(f)
1366 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001367 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001368 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001369 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001370 self.assertEqual(f.read(), b"123xxx")
1371
R David Murray67bfe802013-02-23 21:51:05 -05001372 def test_args_error(self):
1373 # Issue #17275
1374 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1375 self.tp(io.BytesIO(), 1024, 1024, 1024)
1376
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377
1378class PyBufferedWriterTest(BufferedWriterTest):
1379 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001380
Guido van Rossum01a27522007-03-07 01:00:12 +00001381class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001382
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001383 def test_constructor(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001385 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001386
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001387 def test_detach(self):
1388 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1389 self.assertRaises(self.UnsupportedOperation, pair.detach)
1390
Florent Xicluna109d5732012-07-07 17:03:22 +02001391 def test_constructor_max_buffer_size_removal(self):
1392 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001393 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001394
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001395 def test_constructor_with_not_readable(self):
1396 class NotReadable(MockRawIO):
1397 def readable(self):
1398 return False
1399
1400 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1401
1402 def test_constructor_with_not_writeable(self):
1403 class NotWriteable(MockRawIO):
1404 def writable(self):
1405 return False
1406
1407 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1408
1409 def test_read(self):
1410 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1411
1412 self.assertEqual(pair.read(3), b"abc")
1413 self.assertEqual(pair.read(1), b"d")
1414 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001415 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1416 self.assertEqual(pair.read(None), b"abc")
1417
1418 def test_readlines(self):
1419 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1420 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1421 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1422 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001423
1424 def test_read1(self):
1425 # .read1() is delegated to the underlying reader object, so this test
1426 # can be shallow.
1427 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1428
1429 self.assertEqual(pair.read1(3), b"abc")
1430
1431 def test_readinto(self):
1432 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1433
1434 data = bytearray(5)
1435 self.assertEqual(pair.readinto(data), 5)
1436 self.assertEqual(data, b"abcde")
1437
1438 def test_write(self):
1439 w = self.MockRawIO()
1440 pair = self.tp(self.MockRawIO(), w)
1441
1442 pair.write(b"abc")
1443 pair.flush()
1444 pair.write(b"def")
1445 pair.flush()
1446 self.assertEqual(w._write_stack, [b"abc", b"def"])
1447
1448 def test_peek(self):
1449 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1450
1451 self.assertTrue(pair.peek(3).startswith(b"abc"))
1452 self.assertEqual(pair.read(3), b"abc")
1453
1454 def test_readable(self):
1455 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1456 self.assertTrue(pair.readable())
1457
1458 def test_writeable(self):
1459 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1460 self.assertTrue(pair.writable())
1461
1462 def test_seekable(self):
1463 # BufferedRWPairs are never seekable, even if their readers and writers
1464 # are.
1465 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1466 self.assertFalse(pair.seekable())
1467
1468 # .flush() is delegated to the underlying writer object and has been
1469 # tested in the test_write method.
1470
1471 def test_close_and_closed(self):
1472 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1473 self.assertFalse(pair.closed)
1474 pair.close()
1475 self.assertTrue(pair.closed)
1476
1477 def test_isatty(self):
1478 class SelectableIsAtty(MockRawIO):
1479 def __init__(self, isatty):
1480 MockRawIO.__init__(self)
1481 self._isatty = isatty
1482
1483 def isatty(self):
1484 return self._isatty
1485
1486 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1487 self.assertFalse(pair.isatty())
1488
1489 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1490 self.assertTrue(pair.isatty())
1491
1492 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1493 self.assertTrue(pair.isatty())
1494
1495 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1496 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498class CBufferedRWPairTest(BufferedRWPairTest):
1499 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001501class PyBufferedRWPairTest(BufferedRWPairTest):
1502 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001504
1505class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1506 read_mode = "rb+"
1507 write_mode = "wb+"
1508
1509 def test_constructor(self):
1510 BufferedReaderTest.test_constructor(self)
1511 BufferedWriterTest.test_constructor(self)
1512
1513 def test_read_and_write(self):
1514 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001515 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001516
1517 self.assertEqual(b"as", rw.read(2))
1518 rw.write(b"ddd")
1519 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001520 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001522 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001524 def test_seek_and_tell(self):
1525 raw = self.BytesIO(b"asdfghjkl")
1526 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001527
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(b"as", rw.read(2))
1529 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001530 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001531 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001532
Antoine Pitroue05565e2011-08-20 14:39:23 +02001533 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001534 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001535 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001536 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001537 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001539 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001540 self.assertEqual(7, rw.tell())
1541 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001542 rw.flush()
1543 self.assertEqual(b"asdf123fl", raw.getvalue())
1544
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001545 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 def check_flush_and_read(self, read_func):
1548 raw = self.BytesIO(b"abcdefghi")
1549 bufio = self.tp(raw)
1550
Ezio Melottib3aedd42010-11-20 19:04:17 +00001551 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001553 self.assertEqual(b"ef", read_func(bufio, 2))
1554 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001556 self.assertEqual(6, bufio.tell())
1557 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558 raw.seek(0, 0)
1559 raw.write(b"XYZ")
1560 # flush() resets the read buffer
1561 bufio.flush()
1562 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001563 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001564
1565 def test_flush_and_read(self):
1566 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1567
1568 def test_flush_and_readinto(self):
1569 def _readinto(bufio, n=-1):
1570 b = bytearray(n if n >= 0 else 9999)
1571 n = bufio.readinto(b)
1572 return bytes(b[:n])
1573 self.check_flush_and_read(_readinto)
1574
1575 def test_flush_and_peek(self):
1576 def _peek(bufio, n=-1):
1577 # This relies on the fact that the buffer can contain the whole
1578 # raw stream, otherwise peek() can return less.
1579 b = bufio.peek(n)
1580 if n != -1:
1581 b = b[:n]
1582 bufio.seek(len(b), 1)
1583 return b
1584 self.check_flush_and_read(_peek)
1585
1586 def test_flush_and_write(self):
1587 raw = self.BytesIO(b"abcdefghi")
1588 bufio = self.tp(raw)
1589
1590 bufio.write(b"123")
1591 bufio.flush()
1592 bufio.write(b"45")
1593 bufio.flush()
1594 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(b"12345fghi", raw.getvalue())
1596 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597
1598 def test_threads(self):
1599 BufferedReaderTest.test_threads(self)
1600 BufferedWriterTest.test_threads(self)
1601
1602 def test_writes_and_peek(self):
1603 def _peek(bufio):
1604 bufio.peek(1)
1605 self.check_writes(_peek)
1606 def _peek(bufio):
1607 pos = bufio.tell()
1608 bufio.seek(-1, 1)
1609 bufio.peek(1)
1610 bufio.seek(pos, 0)
1611 self.check_writes(_peek)
1612
1613 def test_writes_and_reads(self):
1614 def _read(bufio):
1615 bufio.seek(-1, 1)
1616 bufio.read(1)
1617 self.check_writes(_read)
1618
1619 def test_writes_and_read1s(self):
1620 def _read1(bufio):
1621 bufio.seek(-1, 1)
1622 bufio.read1(1)
1623 self.check_writes(_read1)
1624
1625 def test_writes_and_readintos(self):
1626 def _read(bufio):
1627 bufio.seek(-1, 1)
1628 bufio.readinto(bytearray(1))
1629 self.check_writes(_read)
1630
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001631 def test_write_after_readahead(self):
1632 # Issue #6629: writing after the buffer was filled by readahead should
1633 # first rewind the raw stream.
1634 for overwrite_size in [1, 5]:
1635 raw = self.BytesIO(b"A" * 10)
1636 bufio = self.tp(raw, 4)
1637 # Trigger readahead
1638 self.assertEqual(bufio.read(1), b"A")
1639 self.assertEqual(bufio.tell(), 1)
1640 # Overwriting should rewind the raw stream if it needs so
1641 bufio.write(b"B" * overwrite_size)
1642 self.assertEqual(bufio.tell(), overwrite_size + 1)
1643 # If the write size was smaller than the buffer size, flush() and
1644 # check that rewind happens.
1645 bufio.flush()
1646 self.assertEqual(bufio.tell(), overwrite_size + 1)
1647 s = raw.getvalue()
1648 self.assertEqual(s,
1649 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1650
Antoine Pitrou7c404892011-05-13 00:13:33 +02001651 def test_write_rewind_write(self):
1652 # Various combinations of reading / writing / seeking backwards / writing again
1653 def mutate(bufio, pos1, pos2):
1654 assert pos2 >= pos1
1655 # Fill the buffer
1656 bufio.seek(pos1)
1657 bufio.read(pos2 - pos1)
1658 bufio.write(b'\x02')
1659 # This writes earlier than the previous write, but still inside
1660 # the buffer.
1661 bufio.seek(pos1)
1662 bufio.write(b'\x01')
1663
1664 b = b"\x80\x81\x82\x83\x84"
1665 for i in range(0, len(b)):
1666 for j in range(i, len(b)):
1667 raw = self.BytesIO(b)
1668 bufio = self.tp(raw, 100)
1669 mutate(bufio, i, j)
1670 bufio.flush()
1671 expected = bytearray(b)
1672 expected[j] = 2
1673 expected[i] = 1
1674 self.assertEqual(raw.getvalue(), expected,
1675 "failed result for i=%d, j=%d" % (i, j))
1676
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001677 def test_truncate_after_read_or_write(self):
1678 raw = self.BytesIO(b"A" * 10)
1679 bufio = self.tp(raw, 100)
1680 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1681 self.assertEqual(bufio.truncate(), 2)
1682 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1683 self.assertEqual(bufio.truncate(), 4)
1684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def test_misbehaved_io(self):
1686 BufferedReaderTest.test_misbehaved_io(self)
1687 BufferedWriterTest.test_misbehaved_io(self)
1688
Antoine Pitroue05565e2011-08-20 14:39:23 +02001689 def test_interleaved_read_write(self):
1690 # Test for issue #12213
1691 with self.BytesIO(b'abcdefgh') as raw:
1692 with self.tp(raw, 100) as f:
1693 f.write(b"1")
1694 self.assertEqual(f.read(1), b'b')
1695 f.write(b'2')
1696 self.assertEqual(f.read1(1), b'd')
1697 f.write(b'3')
1698 buf = bytearray(1)
1699 f.readinto(buf)
1700 self.assertEqual(buf, b'f')
1701 f.write(b'4')
1702 self.assertEqual(f.peek(1), b'h')
1703 f.flush()
1704 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1705
1706 with self.BytesIO(b'abc') as raw:
1707 with self.tp(raw, 100) as f:
1708 self.assertEqual(f.read(1), b'a')
1709 f.write(b"2")
1710 self.assertEqual(f.read(1), b'c')
1711 f.flush()
1712 self.assertEqual(raw.getvalue(), b'a2c')
1713
1714 def test_interleaved_readline_write(self):
1715 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1716 with self.tp(raw) as f:
1717 f.write(b'1')
1718 self.assertEqual(f.readline(), b'b\n')
1719 f.write(b'2')
1720 self.assertEqual(f.readline(), b'def\n')
1721 f.write(b'3')
1722 self.assertEqual(f.readline(), b'\n')
1723 f.flush()
1724 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1725
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001726 # You can't construct a BufferedRandom over a non-seekable stream.
1727 test_unseekable = None
1728
R David Murray67bfe802013-02-23 21:51:05 -05001729
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001730class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001731 tp = io.BufferedRandom
1732
1733 def test_constructor(self):
1734 BufferedRandomTest.test_constructor(self)
1735 # The allocation can succeed on 32-bit builds, e.g. with more
1736 # than 2GB RAM and a 64-bit kernel.
1737 if sys.maxsize > 0x7FFFFFFF:
1738 rawio = self.MockRawIO()
1739 bufio = self.tp(rawio)
1740 self.assertRaises((OverflowError, MemoryError, ValueError),
1741 bufio.__init__, rawio, sys.maxsize)
1742
1743 def test_garbage_collection(self):
1744 CBufferedReaderTest.test_garbage_collection(self)
1745 CBufferedWriterTest.test_garbage_collection(self)
1746
R David Murray67bfe802013-02-23 21:51:05 -05001747 def test_args_error(self):
1748 # Issue #17275
1749 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
1750 self.tp(io.BytesIO(), 1024, 1024, 1024)
1751
1752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753class PyBufferedRandomTest(BufferedRandomTest):
1754 tp = pyio.BufferedRandom
1755
1756
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001757# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1758# properties:
1759# - A single output character can correspond to many bytes of input.
1760# - The number of input bytes to complete the character can be
1761# undetermined until the last input byte is received.
1762# - The number of input bytes can vary depending on previous input.
1763# - A single input byte can correspond to many characters of output.
1764# - The number of output characters can be undetermined until the
1765# last input byte is received.
1766# - The number of output characters can vary depending on previous input.
1767
1768class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1769 """
1770 For testing seek/tell behavior with a stateful, buffering decoder.
1771
1772 Input is a sequence of words. Words may be fixed-length (length set
1773 by input) or variable-length (period-terminated). In variable-length
1774 mode, extra periods are ignored. Possible words are:
1775 - 'i' followed by a number sets the input length, I (maximum 99).
1776 When I is set to 0, words are space-terminated.
1777 - 'o' followed by a number sets the output length, O (maximum 99).
1778 - Any other word is converted into a word followed by a period on
1779 the output. The output word consists of the input word truncated
1780 or padded out with hyphens to make its length equal to O. If O
1781 is 0, the word is output verbatim without truncating or padding.
1782 I and O are initially set to 1. When I changes, any buffered input is
1783 re-scanned according to the new I. EOF also terminates the last word.
1784 """
1785
1786 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001787 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001788 self.reset()
1789
1790 def __repr__(self):
1791 return '<SID %x>' % id(self)
1792
1793 def reset(self):
1794 self.i = 1
1795 self.o = 1
1796 self.buffer = bytearray()
1797
1798 def getstate(self):
1799 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1800 return bytes(self.buffer), i*100 + o
1801
1802 def setstate(self, state):
1803 buffer, io = state
1804 self.buffer = bytearray(buffer)
1805 i, o = divmod(io, 100)
1806 self.i, self.o = i ^ 1, o ^ 1
1807
1808 def decode(self, input, final=False):
1809 output = ''
1810 for b in input:
1811 if self.i == 0: # variable-length, terminated with period
1812 if b == ord('.'):
1813 if self.buffer:
1814 output += self.process_word()
1815 else:
1816 self.buffer.append(b)
1817 else: # fixed-length, terminate after self.i bytes
1818 self.buffer.append(b)
1819 if len(self.buffer) == self.i:
1820 output += self.process_word()
1821 if final and self.buffer: # EOF terminates the last word
1822 output += self.process_word()
1823 return output
1824
1825 def process_word(self):
1826 output = ''
1827 if self.buffer[0] == ord('i'):
1828 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1829 elif self.buffer[0] == ord('o'):
1830 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1831 else:
1832 output = self.buffer.decode('ascii')
1833 if len(output) < self.o:
1834 output += '-'*self.o # pad out with hyphens
1835 if self.o:
1836 output = output[:self.o] # truncate to output length
1837 output += '.'
1838 self.buffer = bytearray()
1839 return output
1840
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001841 codecEnabled = False
1842
1843 @classmethod
1844 def lookupTestDecoder(cls, name):
1845 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001846 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001847 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001848 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001849 incrementalencoder=None,
1850 streamreader=None, streamwriter=None,
1851 incrementaldecoder=cls)
1852
1853# Register the previous decoder for testing.
1854# Disabled by default, tests will enable it.
1855codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1856
1857
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001858class StatefulIncrementalDecoderTest(unittest.TestCase):
1859 """
1860 Make sure the StatefulIncrementalDecoder actually works.
1861 """
1862
1863 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001864 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001865 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001866 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001867 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001868 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001869 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001870 # I=0, O=6 (variable-length input, fixed-length output)
1871 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1872 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001873 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001874 # I=6, O=3 (fixed-length input > fixed-length output)
1875 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1876 # I=0, then 3; O=29, then 15 (with longer output)
1877 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1878 'a----------------------------.' +
1879 'b----------------------------.' +
1880 'cde--------------------------.' +
1881 'abcdefghijabcde.' +
1882 'a.b------------.' +
1883 '.c.------------.' +
1884 'd.e------------.' +
1885 'k--------------.' +
1886 'l--------------.' +
1887 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001888 ]
1889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001891 # Try a few one-shot test cases.
1892 for input, eof, output in self.test_cases:
1893 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001894 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001895
1896 # Also test an unfinished decode, followed by forcing EOF.
1897 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001898 self.assertEqual(d.decode(b'oiabcd'), '')
1899 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001900
1901class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001902
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001903 def setUp(self):
1904 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1905 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001906 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001907
Guido van Rossumd0712812007-04-11 16:32:43 +00001908 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001909 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911 def test_constructor(self):
1912 r = self.BytesIO(b"\xc3\xa9\n\n")
1913 b = self.BufferedReader(r, 1000)
1914 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001915 t.__init__(b, encoding="latin-1", newline="\r\n")
1916 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001917 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001918 t.__init__(b, encoding="utf-8", line_buffering=True)
1919 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001920 self.assertEqual(t.line_buffering, True)
1921 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001922 self.assertRaises(TypeError, t.__init__, b, newline=42)
1923 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1924
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001925 def test_detach(self):
1926 r = self.BytesIO()
1927 b = self.BufferedWriter(r)
1928 t = self.TextIOWrapper(b)
1929 self.assertIs(t.detach(), b)
1930
1931 t = self.TextIOWrapper(b, encoding="ascii")
1932 t.write("howdy")
1933 self.assertFalse(r.getvalue())
1934 t.detach()
1935 self.assertEqual(r.getvalue(), b"howdy")
1936 self.assertRaises(ValueError, t.detach)
1937
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001938 def test_repr(self):
1939 raw = self.BytesIO("hello".encode("utf-8"))
1940 b = self.BufferedReader(raw)
1941 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001942 modname = self.TextIOWrapper.__module__
1943 self.assertEqual(repr(t),
1944 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1945 raw.name = "dummy"
1946 self.assertEqual(repr(t),
1947 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001948 t.mode = "r"
1949 self.assertEqual(repr(t),
1950 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001951 raw.name = b"dummy"
1952 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001953 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 def test_line_buffering(self):
1956 r = self.BytesIO()
1957 b = self.BufferedWriter(r, 1000)
1958 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001959 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001960 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001961 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001962 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001963 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001964 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001965
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001966 def test_default_encoding(self):
1967 old_environ = dict(os.environ)
1968 try:
1969 # try to get a user preferred encoding different than the current
1970 # locale encoding to check that TextIOWrapper() uses the current
1971 # locale encoding and not the user preferred encoding
1972 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1973 if key in os.environ:
1974 del os.environ[key]
1975
1976 current_locale_encoding = locale.getpreferredencoding(False)
1977 b = self.BytesIO()
1978 t = self.TextIOWrapper(b)
1979 self.assertEqual(t.encoding, current_locale_encoding)
1980 finally:
1981 os.environ.clear()
1982 os.environ.update(old_environ)
1983
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02001984 # Issue 15989
1985 def test_device_encoding(self):
1986 b = self.BytesIO()
1987 b.fileno = lambda: _testcapi.INT_MAX + 1
1988 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1989 b.fileno = lambda: _testcapi.UINT_MAX + 1
1990 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 def test_encoding(self):
1993 # Check the encoding attribute is always set, and valid
1994 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001995 t = self.TextIOWrapper(b, encoding="utf-8")
1996 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001998 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 codecs.lookup(t.encoding)
2000
2001 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002002 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 b = self.BytesIO(b"abc\n\xff\n")
2004 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002005 self.assertRaises(UnicodeError, t.read)
2006 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 b = self.BytesIO(b"abc\n\xff\n")
2008 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002009 self.assertRaises(UnicodeError, t.read)
2010 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 b = self.BytesIO(b"abc\n\xff\n")
2012 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002014 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002015 b = self.BytesIO(b"abc\n\xff\n")
2016 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002020 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 b = self.BytesIO()
2022 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002023 self.assertRaises(UnicodeError, t.write, "\xff")
2024 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 b = self.BytesIO()
2026 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002027 self.assertRaises(UnicodeError, t.write, "\xff")
2028 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 b = self.BytesIO()
2030 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002031 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002032 t.write("abc\xffdef\n")
2033 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002034 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002035 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 b = self.BytesIO()
2037 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002038 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002039 t.write("abc\xffdef\n")
2040 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002041 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002044 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2045
2046 tests = [
2047 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002048 [ '', input_lines ],
2049 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2050 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2051 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002052 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002053 encodings = (
2054 'utf-8', 'latin-1',
2055 'utf-16', 'utf-16-le', 'utf-16-be',
2056 'utf-32', 'utf-32-le', 'utf-32-be',
2057 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002058
Guido van Rossum8358db22007-08-18 21:39:55 +00002059 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002060 # character in TextIOWrapper._pending_line.
2061 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002062 # XXX: str.encode() should return bytes
2063 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002064 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002065 for bufsize in range(1, 10):
2066 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2068 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002069 encoding=encoding)
2070 if do_reads:
2071 got_lines = []
2072 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002073 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002074 if c2 == '':
2075 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002076 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002077 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002078 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002079 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002080
2081 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002082 self.assertEqual(got_line, exp_line)
2083 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 def test_newlines_input(self):
2086 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002087 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2088 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002089 (None, normalized.decode("ascii").splitlines(keepends=True)),
2090 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2092 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2093 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002094 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 buf = self.BytesIO(testdata)
2096 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002097 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002098 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002099 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_newlines_output(self):
2102 testdict = {
2103 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2104 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2105 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2106 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2107 }
2108 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2109 for newline, expected in tests:
2110 buf = self.BytesIO()
2111 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2112 txt.write("AAA\nB")
2113 txt.write("BB\nCCC\n")
2114 txt.write("X\rY\r\nZ")
2115 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002116 self.assertEqual(buf.closed, False)
2117 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118
2119 def test_destructor(self):
2120 l = []
2121 base = self.BytesIO
2122 class MyBytesIO(base):
2123 def close(self):
2124 l.append(self.getvalue())
2125 base.close(self)
2126 b = MyBytesIO()
2127 t = self.TextIOWrapper(b, encoding="ascii")
2128 t.write("abc")
2129 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002130 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002132
2133 def test_override_destructor(self):
2134 record = []
2135 class MyTextIO(self.TextIOWrapper):
2136 def __del__(self):
2137 record.append(1)
2138 try:
2139 f = super().__del__
2140 except AttributeError:
2141 pass
2142 else:
2143 f()
2144 def close(self):
2145 record.append(2)
2146 super().close()
2147 def flush(self):
2148 record.append(3)
2149 super().flush()
2150 b = self.BytesIO()
2151 t = MyTextIO(b, encoding="ascii")
2152 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002153 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 self.assertEqual(record, [1, 2, 3])
2155
2156 def test_error_through_destructor(self):
2157 # Test that the exception state is not modified by a destructor,
2158 # even if close() fails.
2159 rawio = self.CloseFailureIO()
2160 def f():
2161 self.TextIOWrapper(rawio).xyzzy
2162 with support.captured_output("stderr") as s:
2163 self.assertRaises(AttributeError, f)
2164 s = s.getvalue().strip()
2165 if s:
2166 # The destructor *may* have printed an unraisable error, check it
2167 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002168 self.assertTrue(s.startswith("Exception IOError: "), s)
2169 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002170
Guido van Rossum9b76da62007-04-11 01:09:03 +00002171 # Systematic tests of the text I/O API
2172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002174 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002175 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002177 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002178 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002179 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002181 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(f.tell(), 0)
2183 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002184 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(f.seek(0), 0)
2186 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002187 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002188 self.assertEqual(f.read(2), "ab")
2189 self.assertEqual(f.read(1), "c")
2190 self.assertEqual(f.read(1), "")
2191 self.assertEqual(f.read(), "")
2192 self.assertEqual(f.tell(), cookie)
2193 self.assertEqual(f.seek(0), 0)
2194 self.assertEqual(f.seek(0, 2), cookie)
2195 self.assertEqual(f.write("def"), 3)
2196 self.assertEqual(f.seek(cookie), cookie)
2197 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002198 if enc.startswith("utf"):
2199 self.multi_line_test(f, enc)
2200 f.close()
2201
2202 def multi_line_test(self, f, enc):
2203 f.seek(0)
2204 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002205 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002206 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002207 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002208 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002209 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002210 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002211 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002212 wlines.append((f.tell(), line))
2213 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002214 f.seek(0)
2215 rlines = []
2216 while True:
2217 pos = f.tell()
2218 line = f.readline()
2219 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002220 break
2221 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002222 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002224 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002225 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002226 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002227 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002228 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002229 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002230 p2 = f.tell()
2231 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002232 self.assertEqual(f.tell(), p0)
2233 self.assertEqual(f.readline(), "\xff\n")
2234 self.assertEqual(f.tell(), p1)
2235 self.assertEqual(f.readline(), "\xff\n")
2236 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002237 f.seek(0)
2238 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002240 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002242 f.close()
2243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 def test_seeking(self):
2245 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002246 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002247 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002248 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002250 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002251 suffix = bytes(u_suffix.encode("utf-8"))
2252 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002253 with self.open(support.TESTFN, "wb") as f:
2254 f.write(line*2)
2255 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2256 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002257 self.assertEqual(s, str(prefix, "ascii"))
2258 self.assertEqual(f.tell(), prefix_size)
2259 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002262 # Regression test for a specific bug
2263 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002264 with self.open(support.TESTFN, "wb") as f:
2265 f.write(data)
2266 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2267 f._CHUNK_SIZE # Just test that it exists
2268 f._CHUNK_SIZE = 2
2269 f.readline()
2270 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 def test_seek_and_tell(self):
2273 #Test seek/tell using the StatefulIncrementalDecoder.
2274 # Make test faster by doing smaller seeks
2275 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002276
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002277 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002278 """Tell/seek to various points within a data stream and ensure
2279 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002281 f.write(data)
2282 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002283 f = self.open(support.TESTFN, encoding='test_decoder')
2284 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002285 decoded = f.read()
2286 f.close()
2287
Neal Norwitze2b07052008-03-18 19:52:05 +00002288 for i in range(min_pos, len(decoded) + 1): # seek positions
2289 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002291 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002292 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002293 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002294 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002295 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002296 f.close()
2297
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002298 # Enable the test decoder.
2299 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002300
2301 # Run the tests.
2302 try:
2303 # Try each test case.
2304 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002305 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002306
2307 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002308 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2309 offset = CHUNK_SIZE - len(input)//2
2310 prefix = b'.'*offset
2311 # Don't bother seeking into the prefix (takes too long).
2312 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002313 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002314
2315 # Ensure our test decoder won't interfere with subsequent tests.
2316 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002317 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002318
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002320 data = "1234567890"
2321 tests = ("utf-16",
2322 "utf-16-le",
2323 "utf-16-be",
2324 "utf-32",
2325 "utf-32-le",
2326 "utf-32-be")
2327 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 buf = self.BytesIO()
2329 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002330 # Check if the BOM is written only once (see issue1753).
2331 f.write(data)
2332 f.write(data)
2333 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002335 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(f.read(), data * 2)
2337 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002338
Benjamin Petersona1b49012009-03-31 23:11:32 +00002339 def test_unreadable(self):
2340 class UnReadable(self.BytesIO):
2341 def readable(self):
2342 return False
2343 txt = self.TextIOWrapper(UnReadable())
2344 self.assertRaises(IOError, txt.read)
2345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002346 def test_read_one_by_one(self):
2347 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002348 reads = ""
2349 while True:
2350 c = txt.read(1)
2351 if not c:
2352 break
2353 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002354 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002355
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002356 def test_readlines(self):
2357 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2358 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2359 txt.seek(0)
2360 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2361 txt.seek(0)
2362 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2363
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002364 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002366 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002368 reads = ""
2369 while True:
2370 c = txt.read(128)
2371 if not c:
2372 break
2373 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002374 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002375
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002376 def test_writelines(self):
2377 l = ['ab', 'cd', 'ef']
2378 buf = self.BytesIO()
2379 txt = self.TextIOWrapper(buf)
2380 txt.writelines(l)
2381 txt.flush()
2382 self.assertEqual(buf.getvalue(), b'abcdef')
2383
2384 def test_writelines_userlist(self):
2385 l = UserList(['ab', 'cd', 'ef'])
2386 buf = self.BytesIO()
2387 txt = self.TextIOWrapper(buf)
2388 txt.writelines(l)
2389 txt.flush()
2390 self.assertEqual(buf.getvalue(), b'abcdef')
2391
2392 def test_writelines_error(self):
2393 txt = self.TextIOWrapper(self.BytesIO())
2394 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2395 self.assertRaises(TypeError, txt.writelines, None)
2396 self.assertRaises(TypeError, txt.writelines, b'abc')
2397
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002398 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002399 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002400
2401 # read one char at a time
2402 reads = ""
2403 while True:
2404 c = txt.read(1)
2405 if not c:
2406 break
2407 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002408 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002409
2410 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002412 txt._CHUNK_SIZE = 4
2413
2414 reads = ""
2415 while True:
2416 c = txt.read(4)
2417 if not c:
2418 break
2419 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002420 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002421
2422 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002424 txt._CHUNK_SIZE = 4
2425
2426 reads = txt.read(4)
2427 reads += txt.read(4)
2428 reads += txt.readline()
2429 reads += txt.readline()
2430 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002431 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002432
2433 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002435 txt._CHUNK_SIZE = 4
2436
2437 reads = txt.read(4)
2438 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002439 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002440
2441 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002442 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002443 txt._CHUNK_SIZE = 4
2444
2445 reads = txt.read(4)
2446 pos = txt.tell()
2447 txt.seek(0)
2448 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002449 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002450
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002451 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002452 buffer = self.BytesIO(self.testdata)
2453 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002454
2455 self.assertEqual(buffer.seekable(), txt.seekable())
2456
Antoine Pitroue4501852009-05-14 18:55:55 +00002457 def test_append_bom(self):
2458 # The BOM is not written again when appending to a non-empty file
2459 filename = support.TESTFN
2460 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2461 with self.open(filename, 'w', encoding=charset) as f:
2462 f.write('aaa')
2463 pos = f.tell()
2464 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002465 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002466
2467 with self.open(filename, 'a', encoding=charset) as f:
2468 f.write('xxx')
2469 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002470 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002471
2472 def test_seek_bom(self):
2473 # Same test, but when seeking manually
2474 filename = support.TESTFN
2475 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2476 with self.open(filename, 'w', encoding=charset) as f:
2477 f.write('aaa')
2478 pos = f.tell()
2479 with self.open(filename, 'r+', encoding=charset) as f:
2480 f.seek(pos)
2481 f.write('zzz')
2482 f.seek(0)
2483 f.write('bbb')
2484 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002485 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002486
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002487 def test_errors_property(self):
2488 with self.open(support.TESTFN, "w") as f:
2489 self.assertEqual(f.errors, "strict")
2490 with self.open(support.TESTFN, "w", errors="replace") as f:
2491 self.assertEqual(f.errors, "replace")
2492
Brett Cannon31f59292011-02-21 19:29:56 +00002493 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002494 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002495 def test_threads_write(self):
2496 # Issue6750: concurrent writes could duplicate data
2497 event = threading.Event()
2498 with self.open(support.TESTFN, "w", buffering=1) as f:
2499 def run(n):
2500 text = "Thread%03d\n" % n
2501 event.wait()
2502 f.write(text)
2503 threads = [threading.Thread(target=lambda n=x: run(n))
2504 for x in range(20)]
2505 for t in threads:
2506 t.start()
2507 time.sleep(0.02)
2508 event.set()
2509 for t in threads:
2510 t.join()
2511 with self.open(support.TESTFN) as f:
2512 content = f.read()
2513 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002514 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002515
Antoine Pitrou6be88762010-05-03 16:48:20 +00002516 def test_flush_error_on_close(self):
2517 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2518 def bad_flush():
2519 raise IOError()
2520 txt.flush = bad_flush
2521 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002522 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002523
2524 def test_multi_close(self):
2525 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2526 txt.close()
2527 txt.close()
2528 txt.close()
2529 self.assertRaises(ValueError, txt.flush)
2530
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002531 def test_unseekable(self):
2532 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2533 self.assertRaises(self.UnsupportedOperation, txt.tell)
2534 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2535
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002536 def test_readonly_attributes(self):
2537 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2538 buf = self.BytesIO(self.testdata)
2539 with self.assertRaises(AttributeError):
2540 txt.buffer = buf
2541
Antoine Pitroue96ec682011-07-23 21:46:35 +02002542 def test_rawio(self):
2543 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2544 # that subprocess.Popen() can have the required unbuffered
2545 # semantics with universal_newlines=True.
2546 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2547 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2548 # Reads
2549 self.assertEqual(txt.read(4), 'abcd')
2550 self.assertEqual(txt.readline(), 'efghi\n')
2551 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2552
2553 def test_rawio_write_through(self):
2554 # Issue #12591: with write_through=True, writes don't need a flush
2555 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2556 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2557 write_through=True)
2558 txt.write('1')
2559 txt.write('23\n4')
2560 txt.write('5')
2561 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2562
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002563 def test_read_nonbytes(self):
2564 # Issue #17106
2565 # Crash when underlying read() returns non-bytes
2566 t = self.TextIOWrapper(self.StringIO('a'))
2567 self.assertRaises(TypeError, t.read, 1)
2568 t = self.TextIOWrapper(self.StringIO('a'))
2569 self.assertRaises(TypeError, t.readline)
2570 t = self.TextIOWrapper(self.StringIO('a'))
2571 self.assertRaises(TypeError, t.read)
2572
2573 def test_illegal_decoder(self):
2574 # Issue #17106
2575 # Crash when decoder returns non-string
2576 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2577 encoding='quopri_codec')
2578 self.assertRaises(TypeError, t.read, 1)
2579 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2580 encoding='quopri_codec')
2581 self.assertRaises(TypeError, t.readline)
2582 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2583 encoding='quopri_codec')
2584 self.assertRaises(TypeError, t.read)
2585
2586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002587class CTextIOWrapperTest(TextIOWrapperTest):
2588
2589 def test_initialization(self):
2590 r = self.BytesIO(b"\xc3\xa9\n\n")
2591 b = self.BufferedReader(r, 1000)
2592 t = self.TextIOWrapper(b)
2593 self.assertRaises(TypeError, t.__init__, b, newline=42)
2594 self.assertRaises(ValueError, t.read)
2595 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2596 self.assertRaises(ValueError, t.read)
2597
2598 def test_garbage_collection(self):
2599 # C TextIOWrapper objects are collected, and collecting them flushes
2600 # all data to disk.
2601 # The Python version has __del__, so it ends in gc.garbage instead.
2602 rawio = io.FileIO(support.TESTFN, "wb")
2603 b = self.BufferedWriter(rawio)
2604 t = self.TextIOWrapper(b, encoding="ascii")
2605 t.write("456def")
2606 t.x = t
2607 wr = weakref.ref(t)
2608 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002609 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002610 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002611 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 self.assertEqual(f.read(), b"456def")
2613
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002614 def test_rwpair_cleared_before_textio(self):
2615 # Issue 13070: TextIOWrapper's finalization would crash when called
2616 # after the reference to the underlying BufferedRWPair's writer got
2617 # cleared by the GC.
2618 for i in range(1000):
2619 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2620 t1 = self.TextIOWrapper(b1, encoding="ascii")
2621 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2622 t2 = self.TextIOWrapper(b2, encoding="ascii")
2623 # circular references
2624 t1.buddy = t2
2625 t2.buddy = t1
2626 support.gc_collect()
2627
2628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002629class PyTextIOWrapperTest(TextIOWrapperTest):
2630 pass
2631
2632
2633class IncrementalNewlineDecoderTest(unittest.TestCase):
2634
2635 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002636 # UTF-8 specific tests for a newline decoder
2637 def _check_decode(b, s, **kwargs):
2638 # We exercise getstate() / setstate() as well as decode()
2639 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002640 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002641 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002642 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002643
Antoine Pitrou180a3362008-12-14 16:36:46 +00002644 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002645
Antoine Pitrou180a3362008-12-14 16:36:46 +00002646 _check_decode(b'\xe8', "")
2647 _check_decode(b'\xa2', "")
2648 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002649
Antoine Pitrou180a3362008-12-14 16:36:46 +00002650 _check_decode(b'\xe8', "")
2651 _check_decode(b'\xa2', "")
2652 _check_decode(b'\x88', "\u8888")
2653
2654 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002655 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2656
Antoine Pitrou180a3362008-12-14 16:36:46 +00002657 decoder.reset()
2658 _check_decode(b'\n', "\n")
2659 _check_decode(b'\r', "")
2660 _check_decode(b'', "\n", final=True)
2661 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002662
Antoine Pitrou180a3362008-12-14 16:36:46 +00002663 _check_decode(b'\r', "")
2664 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002665
Antoine Pitrou180a3362008-12-14 16:36:46 +00002666 _check_decode(b'\r\r\n', "\n\n")
2667 _check_decode(b'\r', "")
2668 _check_decode(b'\r', "\n")
2669 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002670
Antoine Pitrou180a3362008-12-14 16:36:46 +00002671 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2672 _check_decode(b'\xe8\xa2\x88', "\u8888")
2673 _check_decode(b'\n', "\n")
2674 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2675 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002678 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002679 if encoding is not None:
2680 encoder = codecs.getincrementalencoder(encoding)()
2681 def _decode_bytewise(s):
2682 # Decode one byte at a time
2683 for b in encoder.encode(s):
2684 result.append(decoder.decode(bytes([b])))
2685 else:
2686 encoder = None
2687 def _decode_bytewise(s):
2688 # Decode one char at a time
2689 for c in s:
2690 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002691 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002692 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002693 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002694 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002695 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002696 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002697 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002698 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002699 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002700 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002701 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002702 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002703 input = "abc"
2704 if encoder is not None:
2705 encoder.reset()
2706 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002707 self.assertEqual(decoder.decode(input), "abc")
2708 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002709
2710 def test_newline_decoder(self):
2711 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712 # None meaning the IncrementalNewlineDecoder takes unicode input
2713 # rather than bytes input
2714 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002715 'utf-16', 'utf-16-le', 'utf-16-be',
2716 'utf-32', 'utf-32-le', 'utf-32-be',
2717 )
2718 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719 decoder = enc and codecs.getincrementaldecoder(enc)()
2720 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2721 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002722 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2724 self.check_newline_decoding_utf8(decoder)
2725
Antoine Pitrou66913e22009-03-06 23:40:56 +00002726 def test_newline_bytes(self):
2727 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2728 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002729 self.assertEqual(dec.newlines, None)
2730 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2731 self.assertEqual(dec.newlines, None)
2732 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2733 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002734 dec = self.IncrementalNewlineDecoder(None, translate=False)
2735 _check(dec)
2736 dec = self.IncrementalNewlineDecoder(None, translate=True)
2737 _check(dec)
2738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002739class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2740 pass
2741
2742class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2743 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002744
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002745
Guido van Rossum01a27522007-03-07 01:00:12 +00002746# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002747
Guido van Rossum5abbf752007-08-27 17:39:33 +00002748class MiscIOTest(unittest.TestCase):
2749
Barry Warsaw40e82462008-11-20 20:14:50 +00002750 def tearDown(self):
2751 support.unlink(support.TESTFN)
2752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 def test___all__(self):
2754 for name in self.io.__all__:
2755 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002756 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002757 if name == "open":
2758 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002759 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002760 self.assertTrue(issubclass(obj, Exception), name)
2761 elif not name.startswith("SEEK_"):
2762 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002763
Barry Warsaw40e82462008-11-20 20:14:50 +00002764 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002767 f.close()
2768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002769 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002770 self.assertEqual(f.name, support.TESTFN)
2771 self.assertEqual(f.buffer.name, support.TESTFN)
2772 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2773 self.assertEqual(f.mode, "U")
2774 self.assertEqual(f.buffer.mode, "rb")
2775 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002776 f.close()
2777
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002778 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002779 self.assertEqual(f.mode, "w+")
2780 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2781 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002783 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002784 self.assertEqual(g.mode, "wb")
2785 self.assertEqual(g.raw.mode, "wb")
2786 self.assertEqual(g.name, f.fileno())
2787 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002788 f.close()
2789 g.close()
2790
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002791 def test_io_after_close(self):
2792 for kwargs in [
2793 {"mode": "w"},
2794 {"mode": "wb"},
2795 {"mode": "w", "buffering": 1},
2796 {"mode": "w", "buffering": 2},
2797 {"mode": "wb", "buffering": 0},
2798 {"mode": "r"},
2799 {"mode": "rb"},
2800 {"mode": "r", "buffering": 1},
2801 {"mode": "r", "buffering": 2},
2802 {"mode": "rb", "buffering": 0},
2803 {"mode": "w+"},
2804 {"mode": "w+b"},
2805 {"mode": "w+", "buffering": 1},
2806 {"mode": "w+", "buffering": 2},
2807 {"mode": "w+b", "buffering": 0},
2808 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002809 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002810 f.close()
2811 self.assertRaises(ValueError, f.flush)
2812 self.assertRaises(ValueError, f.fileno)
2813 self.assertRaises(ValueError, f.isatty)
2814 self.assertRaises(ValueError, f.__iter__)
2815 if hasattr(f, "peek"):
2816 self.assertRaises(ValueError, f.peek, 1)
2817 self.assertRaises(ValueError, f.read)
2818 if hasattr(f, "read1"):
2819 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002820 if hasattr(f, "readall"):
2821 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002822 if hasattr(f, "readinto"):
2823 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2824 self.assertRaises(ValueError, f.readline)
2825 self.assertRaises(ValueError, f.readlines)
2826 self.assertRaises(ValueError, f.seek, 0)
2827 self.assertRaises(ValueError, f.tell)
2828 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002829 self.assertRaises(ValueError, f.write,
2830 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002831 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002834 def test_blockingioerror(self):
2835 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002836 class C(str):
2837 pass
2838 c = C("")
2839 b = self.BlockingIOError(1, c)
2840 c.b = b
2841 b.c = c
2842 wr = weakref.ref(c)
2843 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002844 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002845 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002846
2847 def test_abcs(self):
2848 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002849 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2850 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2851 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2852 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002853
2854 def _check_abc_inheritance(self, abcmodule):
2855 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002856 self.assertIsInstance(f, abcmodule.IOBase)
2857 self.assertIsInstance(f, abcmodule.RawIOBase)
2858 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2859 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002860 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002861 self.assertIsInstance(f, abcmodule.IOBase)
2862 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2863 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2864 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002865 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002866 self.assertIsInstance(f, abcmodule.IOBase)
2867 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2868 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2869 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002870
2871 def test_abc_inheritance(self):
2872 # Test implementations inherit from their respective ABCs
2873 self._check_abc_inheritance(self)
2874
2875 def test_abc_inheritance_official(self):
2876 # Test implementations inherit from the official ABCs of the
2877 # baseline "io" module.
2878 self._check_abc_inheritance(io)
2879
Antoine Pitroue033e062010-10-29 10:38:18 +00002880 def _check_warn_on_dealloc(self, *args, **kwargs):
2881 f = open(*args, **kwargs)
2882 r = repr(f)
2883 with self.assertWarns(ResourceWarning) as cm:
2884 f = None
2885 support.gc_collect()
2886 self.assertIn(r, str(cm.warning.args[0]))
2887
2888 def test_warn_on_dealloc(self):
2889 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2890 self._check_warn_on_dealloc(support.TESTFN, "wb")
2891 self._check_warn_on_dealloc(support.TESTFN, "w")
2892
2893 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2894 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002895 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002896 for fd in fds:
2897 try:
2898 os.close(fd)
2899 except EnvironmentError as e:
2900 if e.errno != errno.EBADF:
2901 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002902 self.addCleanup(cleanup_fds)
2903 r, w = os.pipe()
2904 fds += r, w
2905 self._check_warn_on_dealloc(r, *args, **kwargs)
2906 # When using closefd=False, there's no warning
2907 r, w = os.pipe()
2908 fds += r, w
2909 with warnings.catch_warnings(record=True) as recorded:
2910 open(r, *args, closefd=False, **kwargs)
2911 support.gc_collect()
2912 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002913
2914 def test_warn_on_dealloc_fd(self):
2915 self._check_warn_on_dealloc_fd("rb", buffering=0)
2916 self._check_warn_on_dealloc_fd("rb")
2917 self._check_warn_on_dealloc_fd("r")
2918
2919
Antoine Pitrou243757e2010-11-05 21:15:39 +00002920 def test_pickling(self):
2921 # Pickling file objects is forbidden
2922 for kwargs in [
2923 {"mode": "w"},
2924 {"mode": "wb"},
2925 {"mode": "wb", "buffering": 0},
2926 {"mode": "r"},
2927 {"mode": "rb"},
2928 {"mode": "rb", "buffering": 0},
2929 {"mode": "w+"},
2930 {"mode": "w+b"},
2931 {"mode": "w+b", "buffering": 0},
2932 ]:
2933 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2934 with self.open(support.TESTFN, **kwargs) as f:
2935 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2936
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002937 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2938 def test_nonblock_pipe_write_bigbuf(self):
2939 self._test_nonblock_pipe_write(16*1024)
2940
2941 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2942 def test_nonblock_pipe_write_smallbuf(self):
2943 self._test_nonblock_pipe_write(1024)
2944
2945 def _set_non_blocking(self, fd):
2946 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2947 self.assertNotEqual(flags, -1)
2948 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2949 self.assertEqual(res, 0)
2950
2951 def _test_nonblock_pipe_write(self, bufsize):
2952 sent = []
2953 received = []
2954 r, w = os.pipe()
2955 self._set_non_blocking(r)
2956 self._set_non_blocking(w)
2957
2958 # To exercise all code paths in the C implementation we need
2959 # to play with buffer sizes. For instance, if we choose a
2960 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2961 # then we will never get a partial write of the buffer.
2962 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2963 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2964
2965 with rf, wf:
2966 for N in 9999, 73, 7574:
2967 try:
2968 i = 0
2969 while True:
2970 msg = bytes([i % 26 + 97]) * N
2971 sent.append(msg)
2972 wf.write(msg)
2973 i += 1
2974
2975 except self.BlockingIOError as e:
2976 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002977 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002978 sent[-1] = sent[-1][:e.characters_written]
2979 received.append(rf.read())
2980 msg = b'BLOCKED'
2981 wf.write(msg)
2982 sent.append(msg)
2983
2984 while True:
2985 try:
2986 wf.flush()
2987 break
2988 except self.BlockingIOError as e:
2989 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002990 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002991 self.assertEqual(e.characters_written, 0)
2992 received.append(rf.read())
2993
2994 received += iter(rf.read, None)
2995
2996 sent, received = b''.join(sent), b''.join(received)
2997 self.assertTrue(sent == received)
2998 self.assertTrue(wf.closed)
2999 self.assertTrue(rf.closed)
3000
Charles-François Natalidc3044c2012-01-09 22:40:02 +01003001 def test_create_fail(self):
3002 # 'x' mode fails if file is existing
3003 with self.open(support.TESTFN, 'w'):
3004 pass
3005 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
3006
3007 def test_create_writes(self):
3008 # 'x' mode opens for writing
3009 with self.open(support.TESTFN, 'xb') as f:
3010 f.write(b"spam")
3011 with self.open(support.TESTFN, 'rb') as f:
3012 self.assertEqual(b"spam", f.read())
3013
Christian Heimes7b648752012-09-10 14:48:43 +02003014 def test_open_allargs(self):
3015 # there used to be a buffer overflow in the parser for rawmode
3016 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
3017
3018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003019class CMiscIOTest(MiscIOTest):
3020 io = io
3021
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03003022 def test_readinto_buffer_overflow(self):
3023 # Issue #18025
3024 class BadReader(self.io.BufferedIOBase):
3025 def read(self, n=-1):
3026 return b'x' * 10**6
3027 bufio = BadReader()
3028 b = bytearray(2)
3029 self.assertRaises(ValueError, bufio.readinto, b)
3030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003031class PyMiscIOTest(MiscIOTest):
3032 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003033
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003034
3035@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3036class SignalsTest(unittest.TestCase):
3037
3038 def setUp(self):
3039 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3040
3041 def tearDown(self):
3042 signal.signal(signal.SIGALRM, self.oldalrm)
3043
3044 def alarm_interrupt(self, sig, frame):
3045 1/0
3046
3047 @unittest.skipUnless(threading, 'Threading required for this test.')
3048 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3049 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003050 invokes the signal handler, and bubbles up the exception raised
3051 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003052 read_results = []
3053 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003054 if hasattr(signal, 'pthread_sigmask'):
3055 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003056 s = os.read(r, 1)
3057 read_results.append(s)
3058 t = threading.Thread(target=_read)
3059 t.daemon = True
3060 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003061 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003062 try:
3063 wio = self.io.open(w, **fdopen_kwargs)
3064 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003065 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003066 # Fill the pipe enough that the write will be blocking.
3067 # It will be interrupted by the timer armed above. Since the
3068 # other thread has read one byte, the low-level write will
3069 # return with a successful (partial) result rather than an EINTR.
3070 # The buffered IO layer must check for pending signal
3071 # handlers, which in this case will invoke alarm_interrupt().
3072 self.assertRaises(ZeroDivisionError,
Antoine Pitroue1a16742013-04-24 23:31:38 +02003073 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003074 t.join()
3075 # We got one byte, get another one and check that it isn't a
3076 # repeat of the first one.
3077 read_results.append(os.read(r, 1))
3078 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3079 finally:
3080 os.close(w)
3081 os.close(r)
3082 # This is deliberate. If we didn't close the file descriptor
3083 # before closing wio, wio would try to flush its internal
3084 # buffer, and block again.
3085 try:
3086 wio.close()
3087 except IOError as e:
3088 if e.errno != errno.EBADF:
3089 raise
3090
3091 def test_interrupted_write_unbuffered(self):
3092 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3093
3094 def test_interrupted_write_buffered(self):
3095 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3096
3097 def test_interrupted_write_text(self):
3098 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3099
Brett Cannon31f59292011-02-21 19:29:56 +00003100 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003101 def check_reentrant_write(self, data, **fdopen_kwargs):
3102 def on_alarm(*args):
3103 # Will be called reentrantly from the same thread
3104 wio.write(data)
3105 1/0
3106 signal.signal(signal.SIGALRM, on_alarm)
3107 r, w = os.pipe()
3108 wio = self.io.open(w, **fdopen_kwargs)
3109 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003110 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003111 # Either the reentrant call to wio.write() fails with RuntimeError,
3112 # or the signal handler raises ZeroDivisionError.
3113 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3114 while 1:
3115 for i in range(100):
3116 wio.write(data)
3117 wio.flush()
3118 # Make sure the buffer doesn't fill up and block further writes
3119 os.read(r, len(data) * 100)
3120 exc = cm.exception
3121 if isinstance(exc, RuntimeError):
3122 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3123 finally:
3124 wio.close()
3125 os.close(r)
3126
3127 def test_reentrant_write_buffered(self):
3128 self.check_reentrant_write(b"xy", mode="wb")
3129
3130 def test_reentrant_write_text(self):
3131 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3132
Antoine Pitrou707ce822011-02-25 21:24:11 +00003133 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3134 """Check that a buffered read, when it gets interrupted (either
3135 returning a partial result or EINTR), properly invokes the signal
3136 handler and retries if the latter returned successfully."""
3137 r, w = os.pipe()
3138 fdopen_kwargs["closefd"] = False
3139 def alarm_handler(sig, frame):
3140 os.write(w, b"bar")
3141 signal.signal(signal.SIGALRM, alarm_handler)
3142 try:
3143 rio = self.io.open(r, **fdopen_kwargs)
3144 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003145 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003146 # Expected behaviour:
3147 # - first raw read() returns partial b"foo"
3148 # - second raw read() returns EINTR
3149 # - third raw read() returns b"bar"
3150 self.assertEqual(decode(rio.read(6)), "foobar")
3151 finally:
3152 rio.close()
3153 os.close(w)
3154 os.close(r)
3155
Antoine Pitrou20db5112011-08-19 20:32:34 +02003156 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003157 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3158 mode="rb")
3159
Antoine Pitrou20db5112011-08-19 20:32:34 +02003160 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003161 self.check_interrupted_read_retry(lambda x: x,
3162 mode="r")
3163
3164 @unittest.skipUnless(threading, 'Threading required for this test.')
3165 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3166 """Check that a buffered write, when it gets interrupted (either
3167 returning a partial result or EINTR), properly invokes the signal
3168 handler and retries if the latter returned successfully."""
3169 select = support.import_module("select")
3170 # A quantity that exceeds the buffer size of an anonymous pipe's
3171 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02003172 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00003173 r, w = os.pipe()
3174 fdopen_kwargs["closefd"] = False
3175 # We need a separate thread to read from the pipe and allow the
3176 # write() to finish. This thread is started after the SIGALRM is
3177 # received (forcing a first EINTR in write()).
3178 read_results = []
3179 write_finished = False
3180 def _read():
3181 while not write_finished:
3182 while r in select.select([r], [], [], 1.0)[0]:
3183 s = os.read(r, 1024)
3184 read_results.append(s)
3185 t = threading.Thread(target=_read)
3186 t.daemon = True
3187 def alarm1(sig, frame):
3188 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003189 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003190 def alarm2(sig, frame):
3191 t.start()
3192 signal.signal(signal.SIGALRM, alarm1)
3193 try:
3194 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003195 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003196 # Expected behaviour:
3197 # - first raw write() is partial (because of the limited pipe buffer
3198 # and the first alarm)
3199 # - second raw write() returns EINTR (because of the second alarm)
3200 # - subsequent write()s are successful (either partial or complete)
3201 self.assertEqual(N, wio.write(item * N))
3202 wio.flush()
3203 write_finished = True
3204 t.join()
3205 self.assertEqual(N, sum(len(x) for x in read_results))
3206 finally:
3207 write_finished = True
3208 os.close(w)
3209 os.close(r)
3210 # This is deliberate. If we didn't close the file descriptor
3211 # before closing wio, wio would try to flush its internal
3212 # buffer, and could block (in case of failure).
3213 try:
3214 wio.close()
3215 except IOError as e:
3216 if e.errno != errno.EBADF:
3217 raise
3218
Antoine Pitrou20db5112011-08-19 20:32:34 +02003219 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003220 self.check_interrupted_write_retry(b"x", mode="wb")
3221
Antoine Pitrou20db5112011-08-19 20:32:34 +02003222 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003223 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3224
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003225
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003226class CSignalsTest(SignalsTest):
3227 io = io
3228
3229class PySignalsTest(SignalsTest):
3230 io = pyio
3231
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003232 # Handling reentrancy issues would slow down _pyio even more, so the
3233 # tests are disabled.
3234 test_reentrant_write_buffered = None
3235 test_reentrant_write_text = None
3236
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003237
Ezio Melottidaa42c72013-03-23 16:30:16 +02003238def load_tests(*args):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003239 tests = (CIOTest, PyIOTest,
3240 CBufferedReaderTest, PyBufferedReaderTest,
3241 CBufferedWriterTest, PyBufferedWriterTest,
3242 CBufferedRWPairTest, PyBufferedRWPairTest,
3243 CBufferedRandomTest, PyBufferedRandomTest,
3244 StatefulIncrementalDecoderTest,
3245 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3246 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003247 CMiscIOTest, PyMiscIOTest,
3248 CSignalsTest, PySignalsTest,
3249 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003250
3251 # Put the namespaces of the IO module we are testing and some useful mock
3252 # classes in the __dict__ of each test.
3253 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003254 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003255 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3256 c_io_ns = {name : getattr(io, name) for name in all_members}
3257 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3258 globs = globals()
3259 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3260 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3261 # Avoid turning open into a bound method.
3262 py_io_ns["open"] = pyio.OpenWrapper
3263 for test in tests:
3264 if test.__name__.startswith("C"):
3265 for name, obj in c_io_ns.items():
3266 setattr(test, name, obj)
3267 elif test.__name__.startswith("Py"):
3268 for name, obj in py_io_ns.items():
3269 setattr(test, name, obj)
3270
Ezio Melottidaa42c72013-03-23 16:30:16 +02003271 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
3272 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00003273
3274if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02003275 unittest.main()