blob: 1cf05276dd5022088b13ac907e674bb66f079ea9 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka441d30f2013-01-19 12:26:26 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
168 raise IOError
169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000424 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000425 print("\nTesting large file ops skipped on %s." % sys.platform,
426 file=sys.stderr)
427 print("It requires %d bytes and a long time." % self.LARGE,
428 file=sys.stderr)
429 print("Use 'regrtest.py -u largefile test_io' to run it.",
430 file=sys.stderr)
431 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "w+b", 0) as f:
433 self.large_file_ops(f)
434 with self.open(support.TESTFN, "w+b") as f:
435 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000436
437 def test_with_open(self):
438 for bufsize in (0, 1, 100):
439 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000441 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000442 self.assertEqual(f.closed, True)
443 f = None
444 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000445 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000446 1/0
447 except ZeroDivisionError:
448 self.assertEqual(f.closed, True)
449 else:
450 self.fail("1/0 didn't raise an exception")
451
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 # issue 5008
453 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000459 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000461 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_destructor(self):
464 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000466 def __del__(self):
467 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 try:
469 f = super().__del__
470 except AttributeError:
471 pass
472 else:
473 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def close(self):
475 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000477 def flush(self):
478 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000480 with support.check_warnings(('', ResourceWarning)):
481 f = MyFileIO(support.TESTFN, "wb")
482 f.write(b"xxx")
483 del f
484 support.gc_collect()
485 self.assertEqual(record, [1, 2, 3])
486 with self.open(support.TESTFN, "rb") as f:
487 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488
489 def _check_base_destructor(self, base):
490 record = []
491 class MyIO(base):
492 def __init__(self):
493 # This exercises the availability of attributes on object
494 # destruction.
495 # (in the C version, close() is called by the tp_dealloc
496 # function, not by __del__)
497 self.on_del = 1
498 self.on_close = 2
499 self.on_flush = 3
500 def __del__(self):
501 record.append(self.on_del)
502 try:
503 f = super().__del__
504 except AttributeError:
505 pass
506 else:
507 f()
508 def close(self):
509 record.append(self.on_close)
510 super().close()
511 def flush(self):
512 record.append(self.on_flush)
513 super().flush()
514 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000517 self.assertEqual(record, [1, 2, 3])
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 def test_IOBase_destructor(self):
520 self._check_base_destructor(self.IOBase)
521
522 def test_RawIOBase_destructor(self):
523 self._check_base_destructor(self.RawIOBase)
524
525 def test_BufferedIOBase_destructor(self):
526 self._check_base_destructor(self.BufferedIOBase)
527
528 def test_TextIOBase_destructor(self):
529 self._check_base_destructor(self.TextIOBase)
530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 f.write(b"xxx")
534 with self.open(support.TESTFN, "rb") as f:
535 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossumd4103952007-04-12 05:44:49 +0000537 def test_array_writes(self):
538 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000539 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", 0) as f:
541 self.assertEqual(f.write(a), n)
542 with self.open(support.TESTFN, "wb") as f:
543 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000544
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000547 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 def test_read_closed(self):
550 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 with self.open(support.TESTFN, "r") as f:
553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.read(), "egg\n")
555 file.seek(0)
556 file.close()
557 self.assertRaises(ValueError, file.read)
558
559 def test_no_closefd_with_filename(self):
560 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562
563 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000569 self.assertEqual(file.buffer.raw.closefd, False)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_garbage_collection(self):
572 # FileIO objects are collected, and collecting them flushes
573 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000574 with support.check_warnings(('', ResourceWarning)):
575 f = self.FileIO(support.TESTFN, "wb")
576 f.write(b"abcxxx")
577 f.f = f
578 wr = weakref.ref(f)
579 del f
580 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000581 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000582 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000584
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 def test_unbounded_file(self):
586 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
587 zero = "/dev/zero"
588 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000593 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000599 self.assertRaises(OverflowError, f.read)
600
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def test_flush_error_on_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 def bad_flush():
604 raise IOError()
605 f.flush = bad_flush
606 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600607 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000608
609 def test_multi_close(self):
610 f = self.open(support.TESTFN, "wb", buffering=0)
611 f.close()
612 f.close()
613 f.close()
614 self.assertRaises(ValueError, f.flush)
615
Antoine Pitrou328ec742010-09-14 18:37:24 +0000616 def test_RawIOBase_read(self):
617 # Exercise the default RawIOBase.read() implementation (which calls
618 # readinto() internally).
619 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
620 self.assertEqual(rawio.read(2), b"ab")
621 self.assertEqual(rawio.read(2), b"c")
622 self.assertEqual(rawio.read(2), b"d")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"ef")
625 self.assertEqual(rawio.read(2), b"g")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"")
628
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400629 def test_types_have_dict(self):
630 test = (
631 self.IOBase(),
632 self.RawIOBase(),
633 self.TextIOBase(),
634 self.StringIO(),
635 self.BytesIO()
636 )
637 for obj in test:
638 self.assertTrue(hasattr(obj, "__dict__"))
639
Ross Lagerwall59142db2011-10-31 20:34:46 +0200640 def test_opener(self):
641 with self.open(support.TESTFN, "w") as f:
642 f.write("egg\n")
643 fd = os.open(support.TESTFN, os.O_RDONLY)
644 def opener(path, flags):
645 return fd
646 with self.open("non-existent", "r", opener=opener) as f:
647 self.assertEqual(f.read(), "egg\n")
648
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200649 def test_fileio_closefd(self):
650 # Issue #4841
651 with self.open(__file__, 'rb') as f1, \
652 self.open(__file__, 'rb') as f2:
653 fileio = self.FileIO(f1.fileno(), closefd=False)
654 # .__init__() must not close f1
655 fileio.__init__(f2.fileno(), closefd=False)
656 f1.readline()
657 # .close() must not close f2
658 fileio.close()
659 f2.readline()
660
661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200663
664 def test_IOBase_finalize(self):
665 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
666 # class which inherits IOBase and an object of this class are caught
667 # in a reference cycle and close() is already in the method cache.
668 class MyIO(self.IOBase):
669 def close(self):
670 pass
671
672 # create an instance to populate the method cache
673 MyIO()
674 obj = MyIO()
675 obj.obj = obj
676 wr = weakref.ref(obj)
677 del MyIO
678 del obj
679 support.gc_collect()
680 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class PyIOTest(IOTest):
683 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000684
Guido van Rossuma9e20242007-03-08 00:43:48 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686class CommonBufferedTests:
687 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
688
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000689 def test_detach(self):
690 raw = self.MockRawIO()
691 buf = self.tp(raw)
692 self.assertIs(buf.detach(), raw)
693 self.assertRaises(ValueError, buf.detach)
694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_fileno(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698
Ezio Melottib3aedd42010-11-20 19:04:17 +0000699 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700
701 def test_no_fileno(self):
702 # XXX will we always have fileno() function? If so, kill
703 # this test. Else, write it.
704 pass
705
706 def test_invalid_args(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709 # Invalid whence
710 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200711 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712
713 def test_override_destructor(self):
714 tp = self.tp
715 record = []
716 class MyBufferedIO(tp):
717 def __del__(self):
718 record.append(1)
719 try:
720 f = super().__del__
721 except AttributeError:
722 pass
723 else:
724 f()
725 def close(self):
726 record.append(2)
727 super().close()
728 def flush(self):
729 record.append(3)
730 super().flush()
731 rawio = self.MockRawIO()
732 bufio = MyBufferedIO(rawio)
733 writable = bufio.writable()
734 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000735 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 if writable:
737 self.assertEqual(record, [1, 2, 3])
738 else:
739 self.assertEqual(record, [1, 2])
740
741 def test_context_manager(self):
742 # Test usability as a context manager
743 rawio = self.MockRawIO()
744 bufio = self.tp(rawio)
745 def _with():
746 with bufio:
747 pass
748 _with()
749 # bufio should now be closed, and using it a second time should raise
750 # a ValueError.
751 self.assertRaises(ValueError, _with)
752
753 def test_error_through_destructor(self):
754 # Test that the exception state is not modified by a destructor,
755 # even if close() fails.
756 rawio = self.CloseFailureIO()
757 def f():
758 self.tp(rawio).xyzzy
759 with support.captured_output("stderr") as s:
760 self.assertRaises(AttributeError, f)
761 s = s.getvalue().strip()
762 if s:
763 # The destructor *may* have printed an unraisable error, check it
764 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000765 self.assertTrue(s.startswith("Exception IOError: "), s)
766 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
Antoine Pitrou716c4442009-05-23 19:04:03 +0000768 def test_repr(self):
769 raw = self.MockRawIO()
770 b = self.tp(raw)
771 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
772 self.assertEqual(repr(b), "<%s>" % clsname)
773 raw.name = "dummy"
774 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
775 raw.name = b"dummy"
776 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
777
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 def test_flush_error_on_close(self):
779 raw = self.MockRawIO()
780 def bad_flush():
781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600785 self.assertTrue(b.closed)
786
787 def test_close_error_on_close(self):
788 raw = self.MockRawIO()
789 def bad_flush():
790 raise IOError('flush')
791 def bad_close():
792 raise IOError('close')
793 raw.close = bad_close
794 b = self.tp(raw)
795 b.flush = bad_flush
796 with self.assertRaises(IOError) as err: # exception not swallowed
797 b.close()
798 self.assertEqual(err.exception.args, ('close',))
799 self.assertEqual(err.exception.__context__.args, ('flush',))
800 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 raw = self.MockRawIO()
804 b = self.tp(raw)
805 b.close()
806 b.close()
807 b.close()
808 self.assertRaises(ValueError, b.flush)
809
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000810 def test_unseekable(self):
811 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
812 self.assertRaises(self.UnsupportedOperation, bufio.tell)
813 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
814
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000815 def test_readonly_attributes(self):
816 raw = self.MockRawIO()
817 buf = self.tp(raw)
818 x = self.MockRawIO()
819 with self.assertRaises(AttributeError):
820 buf.raw = x
821
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200823class SizeofTest:
824
825 @support.cpython_only
826 def test_sizeof(self):
827 bufsize1 = 4096
828 bufsize2 = 8192
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize1)
831 size = sys.getsizeof(bufio) - bufsize1
832 rawio = self.MockRawIO()
833 bufio = self.tp(rawio, buffer_size=bufsize2)
834 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
835
836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
838 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_constructor(self):
841 rawio = self.MockRawIO([b"abc"])
842 bufio = self.tp(rawio)
843 bufio.__init__(rawio)
844 bufio.__init__(rawio, buffer_size=1024)
845 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000846 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
849 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
850 rawio = self.MockRawIO([b"abc"])
851 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000852 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000855 for arg in (None, 7):
856 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
857 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 # Invalid args
860 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read1(self):
863 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
864 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000865 self.assertEqual(b"a", bufio.read(1))
866 self.assertEqual(b"b", bufio.read1(1))
867 self.assertEqual(rawio._reads, 1)
868 self.assertEqual(b"c", bufio.read1(100))
869 self.assertEqual(rawio._reads, 1)
870 self.assertEqual(b"d", bufio.read1(100))
871 self.assertEqual(rawio._reads, 2)
872 self.assertEqual(b"efg", bufio.read1(100))
873 self.assertEqual(rawio._reads, 3)
874 self.assertEqual(b"", bufio.read1(100))
875 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 # Invalid args
877 self.assertRaises(ValueError, bufio.read1, -1)
878
879 def test_readinto(self):
880 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
881 bufio = self.tp(rawio)
882 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000883 self.assertEqual(bufio.readinto(b), 2)
884 self.assertEqual(b, b"ab")
885 self.assertEqual(bufio.readinto(b), 2)
886 self.assertEqual(b, b"cd")
887 self.assertEqual(bufio.readinto(b), 2)
888 self.assertEqual(b, b"ef")
889 self.assertEqual(bufio.readinto(b), 1)
890 self.assertEqual(b, b"gf")
891 self.assertEqual(bufio.readinto(b), 0)
892 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200893 rawio = self.MockRawIO((b"abc", None))
894 bufio = self.tp(rawio)
895 self.assertEqual(bufio.readinto(b), 2)
896 self.assertEqual(b, b"ab")
897 self.assertEqual(bufio.readinto(b), 1)
898 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000900 def test_readlines(self):
901 def bufio():
902 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
903 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
905 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
906 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000909 data = b"abcdefghi"
910 dlen = len(data)
911
912 tests = [
913 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
914 [ 100, [ 3, 3, 3], [ dlen ] ],
915 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
916 ]
917
918 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 rawio = self.MockFileIO(data)
920 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000921 pos = 0
922 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000923 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000924 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000926 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000929 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
931 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000932 self.assertEqual(b"abcd", bufio.read(6))
933 self.assertEqual(b"e", bufio.read(1))
934 self.assertEqual(b"fg", bufio.read())
935 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200936 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000938
Victor Stinnera80987f2011-05-25 22:47:16 +0200939 rawio = self.MockRawIO((b"a", None, None))
940 self.assertEqual(b"a", rawio.readall())
941 self.assertIsNone(rawio.readall())
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_read_past_eof(self):
944 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
945 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_read_all(self):
950 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
951 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000952
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Victor Stinner45df8202010-04-28 22:31:17 +0000955 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000956 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000958 try:
959 # Write out many bytes with exactly the same number of 0's,
960 # 1's... 255's. This will help us check that concurrent reading
961 # doesn't duplicate or forget contents.
962 N = 1000
963 l = list(range(256)) * N
964 random.shuffle(l)
965 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000966 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000967 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000968 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000970 errors = []
971 results = []
972 def f():
973 try:
974 # Intra-buffer read then buffer-flushing read
975 for n in cycle([1, 19]):
976 s = bufio.read(n)
977 if not s:
978 break
979 # list.append() is atomic
980 results.append(s)
981 except Exception as e:
982 errors.append(e)
983 raise
984 threads = [threading.Thread(target=f) for x in range(20)]
985 for t in threads:
986 t.start()
987 time.sleep(0.02) # yield
988 for t in threads:
989 t.join()
990 self.assertFalse(errors,
991 "the following exceptions were caught: %r" % errors)
992 s = b''.join(results)
993 for i in range(256):
994 c = bytes(bytearray([i]))
995 self.assertEqual(s.count(c), N)
996 finally:
997 support.unlink(support.TESTFN)
998
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200999 def test_unseekable(self):
1000 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1001 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1002 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1003 bufio.read(1)
1004 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1005 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 def test_misbehaved_io(self):
1008 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1009 bufio = self.tp(rawio)
1010 self.assertRaises(IOError, bufio.seek, 0)
1011 self.assertRaises(IOError, bufio.tell)
1012
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001013 def test_no_extraneous_read(self):
1014 # Issue #9550; when the raw IO object has satisfied the read request,
1015 # we should not issue any additional reads, otherwise it may block
1016 # (e.g. socket).
1017 bufsize = 16
1018 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1019 rawio = self.MockRawIO([b"x" * n])
1020 bufio = self.tp(rawio, bufsize)
1021 self.assertEqual(bufio.read(n), b"x" * n)
1022 # Simple case: one raw read is enough to satisfy the request.
1023 self.assertEqual(rawio._extraneous_reads, 0,
1024 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1025 # A more complex case where two raw reads are needed to satisfy
1026 # the request.
1027 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1028 bufio = self.tp(rawio, bufsize)
1029 self.assertEqual(bufio.read(n), b"x" * n)
1030 self.assertEqual(rawio._extraneous_reads, 0,
1031 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1032
1033
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001034class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 tp = io.BufferedReader
1036
1037 def test_constructor(self):
1038 BufferedReaderTest.test_constructor(self)
1039 # The allocation can succeed on 32-bit builds, e.g. with more
1040 # than 2GB RAM and a 64-bit kernel.
1041 if sys.maxsize > 0x7FFFFFFF:
1042 rawio = self.MockRawIO()
1043 bufio = self.tp(rawio)
1044 self.assertRaises((OverflowError, MemoryError, ValueError),
1045 bufio.__init__, rawio, sys.maxsize)
1046
1047 def test_initialization(self):
1048 rawio = self.MockRawIO([b"abc"])
1049 bufio = self.tp(rawio)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1051 self.assertRaises(ValueError, bufio.read)
1052 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1053 self.assertRaises(ValueError, bufio.read)
1054 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1055 self.assertRaises(ValueError, bufio.read)
1056
1057 def test_misbehaved_io_read(self):
1058 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1059 bufio = self.tp(rawio)
1060 # _pyio.BufferedReader seems to implement reading different, so that
1061 # checking this is not so easy.
1062 self.assertRaises(IOError, bufio.read, 10)
1063
1064 def test_garbage_collection(self):
1065 # C BufferedReader objects are collected.
1066 # The Python version has __del__, so it ends into gc.garbage instead
1067 rawio = self.FileIO(support.TESTFN, "w+b")
1068 f = self.tp(rawio)
1069 f.f = f
1070 wr = weakref.ref(f)
1071 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001072 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001073 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074
1075class PyBufferedReaderTest(BufferedReaderTest):
1076 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001077
Guido van Rossuma9e20242007-03-08 00:43:48 +00001078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1080 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_constructor(self):
1083 rawio = self.MockRawIO()
1084 bufio = self.tp(rawio)
1085 bufio.__init__(rawio)
1086 bufio.__init__(rawio, buffer_size=1024)
1087 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001088 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 bufio.flush()
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1092 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1093 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001094 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001098 def test_detach_flush(self):
1099 raw = self.MockRawIO()
1100 buf = self.tp(raw)
1101 buf.write(b"howdy!")
1102 self.assertFalse(raw._write_stack)
1103 buf.detach()
1104 self.assertEqual(raw._write_stack, [b"howdy!"])
1105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001107 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 writer = self.MockRawIO()
1109 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001110 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001111 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_write_overflow(self):
1114 writer = self.MockRawIO()
1115 bufio = self.tp(writer, 8)
1116 contents = b"abcdefghijklmnop"
1117 for n in range(0, len(contents), 3):
1118 bufio.write(contents[n:n+3])
1119 flushed = b"".join(writer._write_stack)
1120 # At least (total - 8) bytes were implicitly flushed, perhaps more
1121 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001122 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def check_writes(self, intermediate_func):
1125 # Lots of writes, test the flushed output is as expected.
1126 contents = bytes(range(256)) * 1000
1127 n = 0
1128 writer = self.MockRawIO()
1129 bufio = self.tp(writer, 13)
1130 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1131 def gen_sizes():
1132 for size in count(1):
1133 for i in range(15):
1134 yield size
1135 sizes = gen_sizes()
1136 while n < len(contents):
1137 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 intermediate_func(bufio)
1140 n += size
1141 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001142 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 def test_writes(self):
1145 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 def test_writes_and_flushes(self):
1148 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 def test_writes_and_seeks(self):
1151 def _seekabs(bufio):
1152 pos = bufio.tell()
1153 bufio.seek(pos + 1, 0)
1154 bufio.seek(pos - 1, 0)
1155 bufio.seek(pos, 0)
1156 self.check_writes(_seekabs)
1157 def _seekrel(bufio):
1158 pos = bufio.seek(0, 1)
1159 bufio.seek(+1, 1)
1160 bufio.seek(-1, 1)
1161 bufio.seek(pos, 0)
1162 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_writes_and_truncates(self):
1165 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 def test_write_non_blocking(self):
1168 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001169 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001170
Ezio Melottib3aedd42010-11-20 19:04:17 +00001171 self.assertEqual(bufio.write(b"abcd"), 4)
1172 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 # 1 byte will be written, the rest will be buffered
1174 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001177 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1178 raw.block_on(b"0")
1179 try:
1180 bufio.write(b"opqrwxyz0123456789")
1181 except self.BlockingIOError as e:
1182 written = e.characters_written
1183 else:
1184 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001185 self.assertEqual(written, 16)
1186 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001188
Ezio Melottib3aedd42010-11-20 19:04:17 +00001189 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001190 s = raw.pop_written()
1191 # Previously buffered bytes were flushed
1192 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001194 def test_write_and_rewind(self):
1195 raw = io.BytesIO()
1196 bufio = self.tp(raw, 4)
1197 self.assertEqual(bufio.write(b"abcdef"), 6)
1198 self.assertEqual(bufio.tell(), 6)
1199 bufio.seek(0, 0)
1200 self.assertEqual(bufio.write(b"XY"), 2)
1201 bufio.seek(6, 0)
1202 self.assertEqual(raw.getvalue(), b"XYcdef")
1203 self.assertEqual(bufio.write(b"123456"), 6)
1204 bufio.flush()
1205 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 def test_flush(self):
1208 writer = self.MockRawIO()
1209 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001210 bufio.write(b"abc")
1211 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001213
Antoine Pitrou131a4892012-10-16 22:57:11 +02001214 def test_writelines(self):
1215 l = [b'ab', b'cd', b'ef']
1216 writer = self.MockRawIO()
1217 bufio = self.tp(writer, 8)
1218 bufio.writelines(l)
1219 bufio.flush()
1220 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1221
1222 def test_writelines_userlist(self):
1223 l = UserList([b'ab', b'cd', b'ef'])
1224 writer = self.MockRawIO()
1225 bufio = self.tp(writer, 8)
1226 bufio.writelines(l)
1227 bufio.flush()
1228 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1229
1230 def test_writelines_error(self):
1231 writer = self.MockRawIO()
1232 bufio = self.tp(writer, 8)
1233 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1234 self.assertRaises(TypeError, bufio.writelines, None)
1235 self.assertRaises(TypeError, bufio.writelines, 'abc')
1236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def test_destructor(self):
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 8)
1240 bufio.write(b"abc")
1241 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001242 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001243 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244
1245 def test_truncate(self):
1246 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001247 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 bufio = self.tp(raw, 8)
1249 bufio.write(b"abcdef")
1250 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001251 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001252 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 self.assertEqual(f.read(), b"abc")
1254
Victor Stinner45df8202010-04-28 22:31:17 +00001255 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001256 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001258 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 # Write out many bytes from many threads and test they were
1260 # all flushed.
1261 N = 1000
1262 contents = bytes(range(256)) * N
1263 sizes = cycle([1, 19])
1264 n = 0
1265 queue = deque()
1266 while n < len(contents):
1267 size = next(sizes)
1268 queue.append(contents[n:n+size])
1269 n += size
1270 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001271 # We use a real file object because it allows us to
1272 # exercise situations where the GIL is released before
1273 # writing the buffer to the raw streams. This is in addition
1274 # to concurrency issues due to switching threads in the middle
1275 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001276 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001278 errors = []
1279 def f():
1280 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001281 while True:
1282 try:
1283 s = queue.popleft()
1284 except IndexError:
1285 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001286 bufio.write(s)
1287 except Exception as e:
1288 errors.append(e)
1289 raise
1290 threads = [threading.Thread(target=f) for x in range(20)]
1291 for t in threads:
1292 t.start()
1293 time.sleep(0.02) # yield
1294 for t in threads:
1295 t.join()
1296 self.assertFalse(errors,
1297 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001298 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001299 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 s = f.read()
1301 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001302 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001303 finally:
1304 support.unlink(support.TESTFN)
1305
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306 def test_misbehaved_io(self):
1307 rawio = self.MisbehavedRawIO()
1308 bufio = self.tp(rawio, 5)
1309 self.assertRaises(IOError, bufio.seek, 0)
1310 self.assertRaises(IOError, bufio.tell)
1311 self.assertRaises(IOError, bufio.write, b"abcdef")
1312
Florent Xicluna109d5732012-07-07 17:03:22 +02001313 def test_max_buffer_size_removal(self):
1314 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001315 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001316
Benjamin Peterson68623612012-12-20 11:53:11 -06001317 def test_write_error_on_close(self):
1318 raw = self.MockRawIO()
1319 def bad_write(b):
1320 raise IOError()
1321 raw.write = bad_write
1322 b = self.tp(raw)
1323 b.write(b'spam')
1324 self.assertRaises(IOError, b.close) # exception not swallowed
1325 self.assertTrue(b.closed)
1326
Benjamin Peterson59406a92009-03-26 17:10:29 +00001327
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001328class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 tp = io.BufferedWriter
1330
1331 def test_constructor(self):
1332 BufferedWriterTest.test_constructor(self)
1333 # The allocation can succeed on 32-bit builds, e.g. with more
1334 # than 2GB RAM and a 64-bit kernel.
1335 if sys.maxsize > 0x7FFFFFFF:
1336 rawio = self.MockRawIO()
1337 bufio = self.tp(rawio)
1338 self.assertRaises((OverflowError, MemoryError, ValueError),
1339 bufio.__init__, rawio, sys.maxsize)
1340
1341 def test_initialization(self):
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1345 self.assertRaises(ValueError, bufio.write, b"def")
1346 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1347 self.assertRaises(ValueError, bufio.write, b"def")
1348 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1349 self.assertRaises(ValueError, bufio.write, b"def")
1350
1351 def test_garbage_collection(self):
1352 # C BufferedWriter objects are collected, and collecting them flushes
1353 # all data to disk.
1354 # The Python version has __del__, so it ends into gc.garbage instead
1355 rawio = self.FileIO(support.TESTFN, "w+b")
1356 f = self.tp(rawio)
1357 f.write(b"123xxx")
1358 f.x = f
1359 wr = weakref.ref(f)
1360 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001361 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001362 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001363 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 self.assertEqual(f.read(), b"123xxx")
1365
1366
1367class PyBufferedWriterTest(BufferedWriterTest):
1368 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001369
Guido van Rossum01a27522007-03-07 01:00:12 +00001370class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001371
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001372 def test_constructor(self):
1373 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001374 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001375
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001376 def test_detach(self):
1377 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1378 self.assertRaises(self.UnsupportedOperation, pair.detach)
1379
Florent Xicluna109d5732012-07-07 17:03:22 +02001380 def test_constructor_max_buffer_size_removal(self):
1381 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001382 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001383
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001384 def test_constructor_with_not_readable(self):
1385 class NotReadable(MockRawIO):
1386 def readable(self):
1387 return False
1388
1389 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1390
1391 def test_constructor_with_not_writeable(self):
1392 class NotWriteable(MockRawIO):
1393 def writable(self):
1394 return False
1395
1396 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1397
1398 def test_read(self):
1399 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1400
1401 self.assertEqual(pair.read(3), b"abc")
1402 self.assertEqual(pair.read(1), b"d")
1403 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001404 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1405 self.assertEqual(pair.read(None), b"abc")
1406
1407 def test_readlines(self):
1408 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1409 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1410 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1411 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001412
1413 def test_read1(self):
1414 # .read1() is delegated to the underlying reader object, so this test
1415 # can be shallow.
1416 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1417
1418 self.assertEqual(pair.read1(3), b"abc")
1419
1420 def test_readinto(self):
1421 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1422
1423 data = bytearray(5)
1424 self.assertEqual(pair.readinto(data), 5)
1425 self.assertEqual(data, b"abcde")
1426
1427 def test_write(self):
1428 w = self.MockRawIO()
1429 pair = self.tp(self.MockRawIO(), w)
1430
1431 pair.write(b"abc")
1432 pair.flush()
1433 pair.write(b"def")
1434 pair.flush()
1435 self.assertEqual(w._write_stack, [b"abc", b"def"])
1436
1437 def test_peek(self):
1438 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1439
1440 self.assertTrue(pair.peek(3).startswith(b"abc"))
1441 self.assertEqual(pair.read(3), b"abc")
1442
1443 def test_readable(self):
1444 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1445 self.assertTrue(pair.readable())
1446
1447 def test_writeable(self):
1448 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1449 self.assertTrue(pair.writable())
1450
1451 def test_seekable(self):
1452 # BufferedRWPairs are never seekable, even if their readers and writers
1453 # are.
1454 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1455 self.assertFalse(pair.seekable())
1456
1457 # .flush() is delegated to the underlying writer object and has been
1458 # tested in the test_write method.
1459
1460 def test_close_and_closed(self):
1461 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1462 self.assertFalse(pair.closed)
1463 pair.close()
1464 self.assertTrue(pair.closed)
1465
1466 def test_isatty(self):
1467 class SelectableIsAtty(MockRawIO):
1468 def __init__(self, isatty):
1469 MockRawIO.__init__(self)
1470 self._isatty = isatty
1471
1472 def isatty(self):
1473 return self._isatty
1474
1475 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1476 self.assertFalse(pair.isatty())
1477
1478 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1479 self.assertTrue(pair.isatty())
1480
1481 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1482 self.assertTrue(pair.isatty())
1483
1484 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1485 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487class CBufferedRWPairTest(BufferedRWPairTest):
1488 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490class PyBufferedRWPairTest(BufferedRWPairTest):
1491 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493
1494class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1495 read_mode = "rb+"
1496 write_mode = "wb+"
1497
1498 def test_constructor(self):
1499 BufferedReaderTest.test_constructor(self)
1500 BufferedWriterTest.test_constructor(self)
1501
1502 def test_read_and_write(self):
1503 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001504 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001505
1506 self.assertEqual(b"as", rw.read(2))
1507 rw.write(b"ddd")
1508 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001509 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001511 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 def test_seek_and_tell(self):
1514 raw = self.BytesIO(b"asdfghjkl")
1515 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001516
Ezio Melottib3aedd42010-11-20 19:04:17 +00001517 self.assertEqual(b"as", rw.read(2))
1518 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001519 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001520 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001521
Antoine Pitroue05565e2011-08-20 14:39:23 +02001522 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001523 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001524 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001525 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001526 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001527 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001528 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001529 self.assertEqual(7, rw.tell())
1530 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001531 rw.flush()
1532 self.assertEqual(b"asdf123fl", raw.getvalue())
1533
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001534 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def check_flush_and_read(self, read_func):
1537 raw = self.BytesIO(b"abcdefghi")
1538 bufio = self.tp(raw)
1539
Ezio Melottib3aedd42010-11-20 19:04:17 +00001540 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001542 self.assertEqual(b"ef", read_func(bufio, 2))
1543 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001545 self.assertEqual(6, bufio.tell())
1546 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 raw.seek(0, 0)
1548 raw.write(b"XYZ")
1549 # flush() resets the read buffer
1550 bufio.flush()
1551 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553
1554 def test_flush_and_read(self):
1555 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1556
1557 def test_flush_and_readinto(self):
1558 def _readinto(bufio, n=-1):
1559 b = bytearray(n if n >= 0 else 9999)
1560 n = bufio.readinto(b)
1561 return bytes(b[:n])
1562 self.check_flush_and_read(_readinto)
1563
1564 def test_flush_and_peek(self):
1565 def _peek(bufio, n=-1):
1566 # This relies on the fact that the buffer can contain the whole
1567 # raw stream, otherwise peek() can return less.
1568 b = bufio.peek(n)
1569 if n != -1:
1570 b = b[:n]
1571 bufio.seek(len(b), 1)
1572 return b
1573 self.check_flush_and_read(_peek)
1574
1575 def test_flush_and_write(self):
1576 raw = self.BytesIO(b"abcdefghi")
1577 bufio = self.tp(raw)
1578
1579 bufio.write(b"123")
1580 bufio.flush()
1581 bufio.write(b"45")
1582 bufio.flush()
1583 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(b"12345fghi", raw.getvalue())
1585 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001586
1587 def test_threads(self):
1588 BufferedReaderTest.test_threads(self)
1589 BufferedWriterTest.test_threads(self)
1590
1591 def test_writes_and_peek(self):
1592 def _peek(bufio):
1593 bufio.peek(1)
1594 self.check_writes(_peek)
1595 def _peek(bufio):
1596 pos = bufio.tell()
1597 bufio.seek(-1, 1)
1598 bufio.peek(1)
1599 bufio.seek(pos, 0)
1600 self.check_writes(_peek)
1601
1602 def test_writes_and_reads(self):
1603 def _read(bufio):
1604 bufio.seek(-1, 1)
1605 bufio.read(1)
1606 self.check_writes(_read)
1607
1608 def test_writes_and_read1s(self):
1609 def _read1(bufio):
1610 bufio.seek(-1, 1)
1611 bufio.read1(1)
1612 self.check_writes(_read1)
1613
1614 def test_writes_and_readintos(self):
1615 def _read(bufio):
1616 bufio.seek(-1, 1)
1617 bufio.readinto(bytearray(1))
1618 self.check_writes(_read)
1619
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001620 def test_write_after_readahead(self):
1621 # Issue #6629: writing after the buffer was filled by readahead should
1622 # first rewind the raw stream.
1623 for overwrite_size in [1, 5]:
1624 raw = self.BytesIO(b"A" * 10)
1625 bufio = self.tp(raw, 4)
1626 # Trigger readahead
1627 self.assertEqual(bufio.read(1), b"A")
1628 self.assertEqual(bufio.tell(), 1)
1629 # Overwriting should rewind the raw stream if it needs so
1630 bufio.write(b"B" * overwrite_size)
1631 self.assertEqual(bufio.tell(), overwrite_size + 1)
1632 # If the write size was smaller than the buffer size, flush() and
1633 # check that rewind happens.
1634 bufio.flush()
1635 self.assertEqual(bufio.tell(), overwrite_size + 1)
1636 s = raw.getvalue()
1637 self.assertEqual(s,
1638 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1639
Antoine Pitrou7c404892011-05-13 00:13:33 +02001640 def test_write_rewind_write(self):
1641 # Various combinations of reading / writing / seeking backwards / writing again
1642 def mutate(bufio, pos1, pos2):
1643 assert pos2 >= pos1
1644 # Fill the buffer
1645 bufio.seek(pos1)
1646 bufio.read(pos2 - pos1)
1647 bufio.write(b'\x02')
1648 # This writes earlier than the previous write, but still inside
1649 # the buffer.
1650 bufio.seek(pos1)
1651 bufio.write(b'\x01')
1652
1653 b = b"\x80\x81\x82\x83\x84"
1654 for i in range(0, len(b)):
1655 for j in range(i, len(b)):
1656 raw = self.BytesIO(b)
1657 bufio = self.tp(raw, 100)
1658 mutate(bufio, i, j)
1659 bufio.flush()
1660 expected = bytearray(b)
1661 expected[j] = 2
1662 expected[i] = 1
1663 self.assertEqual(raw.getvalue(), expected,
1664 "failed result for i=%d, j=%d" % (i, j))
1665
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001666 def test_truncate_after_read_or_write(self):
1667 raw = self.BytesIO(b"A" * 10)
1668 bufio = self.tp(raw, 100)
1669 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1670 self.assertEqual(bufio.truncate(), 2)
1671 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1672 self.assertEqual(bufio.truncate(), 4)
1673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def test_misbehaved_io(self):
1675 BufferedReaderTest.test_misbehaved_io(self)
1676 BufferedWriterTest.test_misbehaved_io(self)
1677
Antoine Pitroue05565e2011-08-20 14:39:23 +02001678 def test_interleaved_read_write(self):
1679 # Test for issue #12213
1680 with self.BytesIO(b'abcdefgh') as raw:
1681 with self.tp(raw, 100) as f:
1682 f.write(b"1")
1683 self.assertEqual(f.read(1), b'b')
1684 f.write(b'2')
1685 self.assertEqual(f.read1(1), b'd')
1686 f.write(b'3')
1687 buf = bytearray(1)
1688 f.readinto(buf)
1689 self.assertEqual(buf, b'f')
1690 f.write(b'4')
1691 self.assertEqual(f.peek(1), b'h')
1692 f.flush()
1693 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1694
1695 with self.BytesIO(b'abc') as raw:
1696 with self.tp(raw, 100) as f:
1697 self.assertEqual(f.read(1), b'a')
1698 f.write(b"2")
1699 self.assertEqual(f.read(1), b'c')
1700 f.flush()
1701 self.assertEqual(raw.getvalue(), b'a2c')
1702
1703 def test_interleaved_readline_write(self):
1704 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1705 with self.tp(raw) as f:
1706 f.write(b'1')
1707 self.assertEqual(f.readline(), b'b\n')
1708 f.write(b'2')
1709 self.assertEqual(f.readline(), b'def\n')
1710 f.write(b'3')
1711 self.assertEqual(f.readline(), b'\n')
1712 f.flush()
1713 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1714
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001715 # You can't construct a BufferedRandom over a non-seekable stream.
1716 test_unseekable = None
1717
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001718class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 tp = io.BufferedRandom
1720
1721 def test_constructor(self):
1722 BufferedRandomTest.test_constructor(self)
1723 # The allocation can succeed on 32-bit builds, e.g. with more
1724 # than 2GB RAM and a 64-bit kernel.
1725 if sys.maxsize > 0x7FFFFFFF:
1726 rawio = self.MockRawIO()
1727 bufio = self.tp(rawio)
1728 self.assertRaises((OverflowError, MemoryError, ValueError),
1729 bufio.__init__, rawio, sys.maxsize)
1730
1731 def test_garbage_collection(self):
1732 CBufferedReaderTest.test_garbage_collection(self)
1733 CBufferedWriterTest.test_garbage_collection(self)
1734
1735class PyBufferedRandomTest(BufferedRandomTest):
1736 tp = pyio.BufferedRandom
1737
1738
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001739# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1740# properties:
1741# - A single output character can correspond to many bytes of input.
1742# - The number of input bytes to complete the character can be
1743# undetermined until the last input byte is received.
1744# - The number of input bytes can vary depending on previous input.
1745# - A single input byte can correspond to many characters of output.
1746# - The number of output characters can be undetermined until the
1747# last input byte is received.
1748# - The number of output characters can vary depending on previous input.
1749
1750class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1751 """
1752 For testing seek/tell behavior with a stateful, buffering decoder.
1753
1754 Input is a sequence of words. Words may be fixed-length (length set
1755 by input) or variable-length (period-terminated). In variable-length
1756 mode, extra periods are ignored. Possible words are:
1757 - 'i' followed by a number sets the input length, I (maximum 99).
1758 When I is set to 0, words are space-terminated.
1759 - 'o' followed by a number sets the output length, O (maximum 99).
1760 - Any other word is converted into a word followed by a period on
1761 the output. The output word consists of the input word truncated
1762 or padded out with hyphens to make its length equal to O. If O
1763 is 0, the word is output verbatim without truncating or padding.
1764 I and O are initially set to 1. When I changes, any buffered input is
1765 re-scanned according to the new I. EOF also terminates the last word.
1766 """
1767
1768 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001769 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001770 self.reset()
1771
1772 def __repr__(self):
1773 return '<SID %x>' % id(self)
1774
1775 def reset(self):
1776 self.i = 1
1777 self.o = 1
1778 self.buffer = bytearray()
1779
1780 def getstate(self):
1781 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1782 return bytes(self.buffer), i*100 + o
1783
1784 def setstate(self, state):
1785 buffer, io = state
1786 self.buffer = bytearray(buffer)
1787 i, o = divmod(io, 100)
1788 self.i, self.o = i ^ 1, o ^ 1
1789
1790 def decode(self, input, final=False):
1791 output = ''
1792 for b in input:
1793 if self.i == 0: # variable-length, terminated with period
1794 if b == ord('.'):
1795 if self.buffer:
1796 output += self.process_word()
1797 else:
1798 self.buffer.append(b)
1799 else: # fixed-length, terminate after self.i bytes
1800 self.buffer.append(b)
1801 if len(self.buffer) == self.i:
1802 output += self.process_word()
1803 if final and self.buffer: # EOF terminates the last word
1804 output += self.process_word()
1805 return output
1806
1807 def process_word(self):
1808 output = ''
1809 if self.buffer[0] == ord('i'):
1810 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1811 elif self.buffer[0] == ord('o'):
1812 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1813 else:
1814 output = self.buffer.decode('ascii')
1815 if len(output) < self.o:
1816 output += '-'*self.o # pad out with hyphens
1817 if self.o:
1818 output = output[:self.o] # truncate to output length
1819 output += '.'
1820 self.buffer = bytearray()
1821 return output
1822
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001823 codecEnabled = False
1824
1825 @classmethod
1826 def lookupTestDecoder(cls, name):
1827 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001828 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001829 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001830 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001831 incrementalencoder=None,
1832 streamreader=None, streamwriter=None,
1833 incrementaldecoder=cls)
1834
1835# Register the previous decoder for testing.
1836# Disabled by default, tests will enable it.
1837codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1838
1839
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001840class StatefulIncrementalDecoderTest(unittest.TestCase):
1841 """
1842 Make sure the StatefulIncrementalDecoder actually works.
1843 """
1844
1845 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001846 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001847 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001848 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001849 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001850 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001851 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001852 # I=0, O=6 (variable-length input, fixed-length output)
1853 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1854 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001855 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001856 # I=6, O=3 (fixed-length input > fixed-length output)
1857 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1858 # I=0, then 3; O=29, then 15 (with longer output)
1859 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1860 'a----------------------------.' +
1861 'b----------------------------.' +
1862 'cde--------------------------.' +
1863 'abcdefghijabcde.' +
1864 'a.b------------.' +
1865 '.c.------------.' +
1866 'd.e------------.' +
1867 'k--------------.' +
1868 'l--------------.' +
1869 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001870 ]
1871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001873 # Try a few one-shot test cases.
1874 for input, eof, output in self.test_cases:
1875 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001876 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001877
1878 # Also test an unfinished decode, followed by forcing EOF.
1879 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001880 self.assertEqual(d.decode(b'oiabcd'), '')
1881 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001882
1883class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001884
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001885 def setUp(self):
1886 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1887 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001888 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001889
Guido van Rossumd0712812007-04-11 16:32:43 +00001890 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001891 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 def test_constructor(self):
1894 r = self.BytesIO(b"\xc3\xa9\n\n")
1895 b = self.BufferedReader(r, 1000)
1896 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001897 t.__init__(b, encoding="latin-1", newline="\r\n")
1898 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001899 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001900 t.__init__(b, encoding="utf-8", line_buffering=True)
1901 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001902 self.assertEqual(t.line_buffering, True)
1903 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 self.assertRaises(TypeError, t.__init__, b, newline=42)
1905 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1906
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001907 def test_detach(self):
1908 r = self.BytesIO()
1909 b = self.BufferedWriter(r)
1910 t = self.TextIOWrapper(b)
1911 self.assertIs(t.detach(), b)
1912
1913 t = self.TextIOWrapper(b, encoding="ascii")
1914 t.write("howdy")
1915 self.assertFalse(r.getvalue())
1916 t.detach()
1917 self.assertEqual(r.getvalue(), b"howdy")
1918 self.assertRaises(ValueError, t.detach)
1919
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001920 def test_repr(self):
1921 raw = self.BytesIO("hello".encode("utf-8"))
1922 b = self.BufferedReader(raw)
1923 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001924 modname = self.TextIOWrapper.__module__
1925 self.assertEqual(repr(t),
1926 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1927 raw.name = "dummy"
1928 self.assertEqual(repr(t),
1929 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001930 t.mode = "r"
1931 self.assertEqual(repr(t),
1932 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001933 raw.name = b"dummy"
1934 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001935 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937 def test_line_buffering(self):
1938 r = self.BytesIO()
1939 b = self.BufferedWriter(r, 1000)
1940 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001941 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001942 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001943 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001944 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001945 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001946 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001947
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001948 def test_default_encoding(self):
1949 old_environ = dict(os.environ)
1950 try:
1951 # try to get a user preferred encoding different than the current
1952 # locale encoding to check that TextIOWrapper() uses the current
1953 # locale encoding and not the user preferred encoding
1954 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1955 if key in os.environ:
1956 del os.environ[key]
1957
1958 current_locale_encoding = locale.getpreferredencoding(False)
1959 b = self.BytesIO()
1960 t = self.TextIOWrapper(b)
1961 self.assertEqual(t.encoding, current_locale_encoding)
1962 finally:
1963 os.environ.clear()
1964 os.environ.update(old_environ)
1965
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02001966 # Issue 15989
1967 def test_device_encoding(self):
1968 b = self.BytesIO()
1969 b.fileno = lambda: _testcapi.INT_MAX + 1
1970 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1971 b.fileno = lambda: _testcapi.UINT_MAX + 1
1972 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 def test_encoding(self):
1975 # Check the encoding attribute is always set, and valid
1976 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001977 t = self.TextIOWrapper(b, encoding="utf-8")
1978 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001979 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001980 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001981 codecs.lookup(t.encoding)
1982
1983 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001984 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985 b = self.BytesIO(b"abc\n\xff\n")
1986 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001987 self.assertRaises(UnicodeError, t.read)
1988 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 b = self.BytesIO(b"abc\n\xff\n")
1990 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001991 self.assertRaises(UnicodeError, t.read)
1992 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 b = self.BytesIO(b"abc\n\xff\n")
1994 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001996 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 b = self.BytesIO(b"abc\n\xff\n")
1998 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001999 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002002 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 b = self.BytesIO()
2004 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002005 self.assertRaises(UnicodeError, t.write, "\xff")
2006 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 b = self.BytesIO()
2008 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002009 self.assertRaises(UnicodeError, t.write, "\xff")
2010 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 b = self.BytesIO()
2012 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002013 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002014 t.write("abc\xffdef\n")
2015 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002017 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 b = self.BytesIO()
2019 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002020 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002021 t.write("abc\xffdef\n")
2022 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002023 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002026 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2027
2028 tests = [
2029 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002030 [ '', input_lines ],
2031 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2032 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2033 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002034 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002035 encodings = (
2036 'utf-8', 'latin-1',
2037 'utf-16', 'utf-16-le', 'utf-16-be',
2038 'utf-32', 'utf-32-le', 'utf-32-be',
2039 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002040
Guido van Rossum8358db22007-08-18 21:39:55 +00002041 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002042 # character in TextIOWrapper._pending_line.
2043 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002044 # XXX: str.encode() should return bytes
2045 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002046 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002047 for bufsize in range(1, 10):
2048 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002049 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2050 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002051 encoding=encoding)
2052 if do_reads:
2053 got_lines = []
2054 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002055 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002056 if c2 == '':
2057 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002058 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002059 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002060 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002061 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002062
2063 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002064 self.assertEqual(got_line, exp_line)
2065 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 def test_newlines_input(self):
2068 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002069 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2070 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002071 (None, normalized.decode("ascii").splitlines(keepends=True)),
2072 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2074 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2075 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002076 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077 buf = self.BytesIO(testdata)
2078 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002079 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002080 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 def test_newlines_output(self):
2084 testdict = {
2085 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2086 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2087 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2088 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2089 }
2090 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2091 for newline, expected in tests:
2092 buf = self.BytesIO()
2093 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2094 txt.write("AAA\nB")
2095 txt.write("BB\nCCC\n")
2096 txt.write("X\rY\r\nZ")
2097 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(buf.closed, False)
2099 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100
2101 def test_destructor(self):
2102 l = []
2103 base = self.BytesIO
2104 class MyBytesIO(base):
2105 def close(self):
2106 l.append(self.getvalue())
2107 base.close(self)
2108 b = MyBytesIO()
2109 t = self.TextIOWrapper(b, encoding="ascii")
2110 t.write("abc")
2111 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002112 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114
2115 def test_override_destructor(self):
2116 record = []
2117 class MyTextIO(self.TextIOWrapper):
2118 def __del__(self):
2119 record.append(1)
2120 try:
2121 f = super().__del__
2122 except AttributeError:
2123 pass
2124 else:
2125 f()
2126 def close(self):
2127 record.append(2)
2128 super().close()
2129 def flush(self):
2130 record.append(3)
2131 super().flush()
2132 b = self.BytesIO()
2133 t = MyTextIO(b, encoding="ascii")
2134 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002135 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 self.assertEqual(record, [1, 2, 3])
2137
2138 def test_error_through_destructor(self):
2139 # Test that the exception state is not modified by a destructor,
2140 # even if close() fails.
2141 rawio = self.CloseFailureIO()
2142 def f():
2143 self.TextIOWrapper(rawio).xyzzy
2144 with support.captured_output("stderr") as s:
2145 self.assertRaises(AttributeError, f)
2146 s = s.getvalue().strip()
2147 if s:
2148 # The destructor *may* have printed an unraisable error, check it
2149 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002150 self.assertTrue(s.startswith("Exception IOError: "), s)
2151 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002152
Guido van Rossum9b76da62007-04-11 01:09:03 +00002153 # Systematic tests of the text I/O API
2154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002156 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002157 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002158 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002159 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002160 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002161 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002163 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002164 self.assertEqual(f.tell(), 0)
2165 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002166 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(f.seek(0), 0)
2168 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002169 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(f.read(2), "ab")
2171 self.assertEqual(f.read(1), "c")
2172 self.assertEqual(f.read(1), "")
2173 self.assertEqual(f.read(), "")
2174 self.assertEqual(f.tell(), cookie)
2175 self.assertEqual(f.seek(0), 0)
2176 self.assertEqual(f.seek(0, 2), cookie)
2177 self.assertEqual(f.write("def"), 3)
2178 self.assertEqual(f.seek(cookie), cookie)
2179 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002180 if enc.startswith("utf"):
2181 self.multi_line_test(f, enc)
2182 f.close()
2183
2184 def multi_line_test(self, f, enc):
2185 f.seek(0)
2186 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002187 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002188 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002189 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002190 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002191 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002192 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002193 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002194 wlines.append((f.tell(), line))
2195 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002196 f.seek(0)
2197 rlines = []
2198 while True:
2199 pos = f.tell()
2200 line = f.readline()
2201 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002202 break
2203 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002207 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002208 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002209 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002210 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002211 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002212 p2 = f.tell()
2213 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002214 self.assertEqual(f.tell(), p0)
2215 self.assertEqual(f.readline(), "\xff\n")
2216 self.assertEqual(f.tell(), p1)
2217 self.assertEqual(f.readline(), "\xff\n")
2218 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002219 f.seek(0)
2220 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002222 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002224 f.close()
2225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 def test_seeking(self):
2227 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002228 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002229 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002230 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002232 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002233 suffix = bytes(u_suffix.encode("utf-8"))
2234 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002235 with self.open(support.TESTFN, "wb") as f:
2236 f.write(line*2)
2237 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2238 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(s, str(prefix, "ascii"))
2240 self.assertEqual(f.tell(), prefix_size)
2241 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002244 # Regression test for a specific bug
2245 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002246 with self.open(support.TESTFN, "wb") as f:
2247 f.write(data)
2248 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2249 f._CHUNK_SIZE # Just test that it exists
2250 f._CHUNK_SIZE = 2
2251 f.readline()
2252 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002254 def test_seek_and_tell(self):
2255 #Test seek/tell using the StatefulIncrementalDecoder.
2256 # Make test faster by doing smaller seeks
2257 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002258
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002259 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002260 """Tell/seek to various points within a data stream and ensure
2261 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002263 f.write(data)
2264 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002265 f = self.open(support.TESTFN, encoding='test_decoder')
2266 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002267 decoded = f.read()
2268 f.close()
2269
Neal Norwitze2b07052008-03-18 19:52:05 +00002270 for i in range(min_pos, len(decoded) + 1): # seek positions
2271 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002273 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002274 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002276 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002278 f.close()
2279
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002280 # Enable the test decoder.
2281 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002282
2283 # Run the tests.
2284 try:
2285 # Try each test case.
2286 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002287 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002288
2289 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002290 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2291 offset = CHUNK_SIZE - len(input)//2
2292 prefix = b'.'*offset
2293 # Don't bother seeking into the prefix (takes too long).
2294 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002295 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002296
2297 # Ensure our test decoder won't interfere with subsequent tests.
2298 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002299 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002300
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002302 data = "1234567890"
2303 tests = ("utf-16",
2304 "utf-16-le",
2305 "utf-16-be",
2306 "utf-32",
2307 "utf-32-le",
2308 "utf-32-be")
2309 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002310 buf = self.BytesIO()
2311 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002312 # Check if the BOM is written only once (see issue1753).
2313 f.write(data)
2314 f.write(data)
2315 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002316 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002317 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002318 self.assertEqual(f.read(), data * 2)
2319 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002320
Benjamin Petersona1b49012009-03-31 23:11:32 +00002321 def test_unreadable(self):
2322 class UnReadable(self.BytesIO):
2323 def readable(self):
2324 return False
2325 txt = self.TextIOWrapper(UnReadable())
2326 self.assertRaises(IOError, txt.read)
2327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 def test_read_one_by_one(self):
2329 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002330 reads = ""
2331 while True:
2332 c = txt.read(1)
2333 if not c:
2334 break
2335 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002337
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002338 def test_readlines(self):
2339 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2340 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2341 txt.seek(0)
2342 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2343 txt.seek(0)
2344 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2345
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002346 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002348 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002349 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002350 reads = ""
2351 while True:
2352 c = txt.read(128)
2353 if not c:
2354 break
2355 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002356 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002357
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002358 def test_writelines(self):
2359 l = ['ab', 'cd', 'ef']
2360 buf = self.BytesIO()
2361 txt = self.TextIOWrapper(buf)
2362 txt.writelines(l)
2363 txt.flush()
2364 self.assertEqual(buf.getvalue(), b'abcdef')
2365
2366 def test_writelines_userlist(self):
2367 l = UserList(['ab', 'cd', 'ef'])
2368 buf = self.BytesIO()
2369 txt = self.TextIOWrapper(buf)
2370 txt.writelines(l)
2371 txt.flush()
2372 self.assertEqual(buf.getvalue(), b'abcdef')
2373
2374 def test_writelines_error(self):
2375 txt = self.TextIOWrapper(self.BytesIO())
2376 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2377 self.assertRaises(TypeError, txt.writelines, None)
2378 self.assertRaises(TypeError, txt.writelines, b'abc')
2379
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002380 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002382
2383 # read one char at a time
2384 reads = ""
2385 while True:
2386 c = txt.read(1)
2387 if not c:
2388 break
2389 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002391
2392 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002393 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002394 txt._CHUNK_SIZE = 4
2395
2396 reads = ""
2397 while True:
2398 c = txt.read(4)
2399 if not c:
2400 break
2401 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002402 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002403
2404 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002405 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002406 txt._CHUNK_SIZE = 4
2407
2408 reads = txt.read(4)
2409 reads += txt.read(4)
2410 reads += txt.readline()
2411 reads += txt.readline()
2412 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002413 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002414
2415 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002417 txt._CHUNK_SIZE = 4
2418
2419 reads = txt.read(4)
2420 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002421 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002422
2423 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002425 txt._CHUNK_SIZE = 4
2426
2427 reads = txt.read(4)
2428 pos = txt.tell()
2429 txt.seek(0)
2430 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002431 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002432
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002433 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 buffer = self.BytesIO(self.testdata)
2435 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002436
2437 self.assertEqual(buffer.seekable(), txt.seekable())
2438
Antoine Pitroue4501852009-05-14 18:55:55 +00002439 def test_append_bom(self):
2440 # The BOM is not written again when appending to a non-empty file
2441 filename = support.TESTFN
2442 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2443 with self.open(filename, 'w', encoding=charset) as f:
2444 f.write('aaa')
2445 pos = f.tell()
2446 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002448
2449 with self.open(filename, 'a', encoding=charset) as f:
2450 f.write('xxx')
2451 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002452 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002453
2454 def test_seek_bom(self):
2455 # Same test, but when seeking manually
2456 filename = support.TESTFN
2457 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2458 with self.open(filename, 'w', encoding=charset) as f:
2459 f.write('aaa')
2460 pos = f.tell()
2461 with self.open(filename, 'r+', encoding=charset) as f:
2462 f.seek(pos)
2463 f.write('zzz')
2464 f.seek(0)
2465 f.write('bbb')
2466 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002468
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002469 def test_errors_property(self):
2470 with self.open(support.TESTFN, "w") as f:
2471 self.assertEqual(f.errors, "strict")
2472 with self.open(support.TESTFN, "w", errors="replace") as f:
2473 self.assertEqual(f.errors, "replace")
2474
Brett Cannon31f59292011-02-21 19:29:56 +00002475 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002476 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002477 def test_threads_write(self):
2478 # Issue6750: concurrent writes could duplicate data
2479 event = threading.Event()
2480 with self.open(support.TESTFN, "w", buffering=1) as f:
2481 def run(n):
2482 text = "Thread%03d\n" % n
2483 event.wait()
2484 f.write(text)
2485 threads = [threading.Thread(target=lambda n=x: run(n))
2486 for x in range(20)]
2487 for t in threads:
2488 t.start()
2489 time.sleep(0.02)
2490 event.set()
2491 for t in threads:
2492 t.join()
2493 with self.open(support.TESTFN) as f:
2494 content = f.read()
2495 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002496 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002497
Antoine Pitrou6be88762010-05-03 16:48:20 +00002498 def test_flush_error_on_close(self):
2499 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2500 def bad_flush():
2501 raise IOError()
2502 txt.flush = bad_flush
2503 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002504 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002505
2506 def test_multi_close(self):
2507 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2508 txt.close()
2509 txt.close()
2510 txt.close()
2511 self.assertRaises(ValueError, txt.flush)
2512
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002513 def test_unseekable(self):
2514 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2515 self.assertRaises(self.UnsupportedOperation, txt.tell)
2516 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2517
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002518 def test_readonly_attributes(self):
2519 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2520 buf = self.BytesIO(self.testdata)
2521 with self.assertRaises(AttributeError):
2522 txt.buffer = buf
2523
Antoine Pitroue96ec682011-07-23 21:46:35 +02002524 def test_rawio(self):
2525 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2526 # that subprocess.Popen() can have the required unbuffered
2527 # semantics with universal_newlines=True.
2528 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2529 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2530 # Reads
2531 self.assertEqual(txt.read(4), 'abcd')
2532 self.assertEqual(txt.readline(), 'efghi\n')
2533 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2534
2535 def test_rawio_write_through(self):
2536 # Issue #12591: with write_through=True, writes don't need a flush
2537 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2538 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2539 write_through=True)
2540 txt.write('1')
2541 txt.write('23\n4')
2542 txt.write('5')
2543 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545class CTextIOWrapperTest(TextIOWrapperTest):
2546
2547 def test_initialization(self):
2548 r = self.BytesIO(b"\xc3\xa9\n\n")
2549 b = self.BufferedReader(r, 1000)
2550 t = self.TextIOWrapper(b)
2551 self.assertRaises(TypeError, t.__init__, b, newline=42)
2552 self.assertRaises(ValueError, t.read)
2553 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2554 self.assertRaises(ValueError, t.read)
2555
2556 def test_garbage_collection(self):
2557 # C TextIOWrapper objects are collected, and collecting them flushes
2558 # all data to disk.
2559 # The Python version has __del__, so it ends in gc.garbage instead.
2560 rawio = io.FileIO(support.TESTFN, "wb")
2561 b = self.BufferedWriter(rawio)
2562 t = self.TextIOWrapper(b, encoding="ascii")
2563 t.write("456def")
2564 t.x = t
2565 wr = weakref.ref(t)
2566 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002567 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002568 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002569 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 self.assertEqual(f.read(), b"456def")
2571
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002572 def test_rwpair_cleared_before_textio(self):
2573 # Issue 13070: TextIOWrapper's finalization would crash when called
2574 # after the reference to the underlying BufferedRWPair's writer got
2575 # cleared by the GC.
2576 for i in range(1000):
2577 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2578 t1 = self.TextIOWrapper(b1, encoding="ascii")
2579 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2580 t2 = self.TextIOWrapper(b2, encoding="ascii")
2581 # circular references
2582 t1.buddy = t2
2583 t2.buddy = t1
2584 support.gc_collect()
2585
2586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002587class PyTextIOWrapperTest(TextIOWrapperTest):
2588 pass
2589
2590
2591class IncrementalNewlineDecoderTest(unittest.TestCase):
2592
2593 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002594 # UTF-8 specific tests for a newline decoder
2595 def _check_decode(b, s, **kwargs):
2596 # We exercise getstate() / setstate() as well as decode()
2597 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002598 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002599 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002600 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002601
Antoine Pitrou180a3362008-12-14 16:36:46 +00002602 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002603
Antoine Pitrou180a3362008-12-14 16:36:46 +00002604 _check_decode(b'\xe8', "")
2605 _check_decode(b'\xa2', "")
2606 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002607
Antoine Pitrou180a3362008-12-14 16:36:46 +00002608 _check_decode(b'\xe8', "")
2609 _check_decode(b'\xa2', "")
2610 _check_decode(b'\x88', "\u8888")
2611
2612 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002613 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2614
Antoine Pitrou180a3362008-12-14 16:36:46 +00002615 decoder.reset()
2616 _check_decode(b'\n', "\n")
2617 _check_decode(b'\r', "")
2618 _check_decode(b'', "\n", final=True)
2619 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002620
Antoine Pitrou180a3362008-12-14 16:36:46 +00002621 _check_decode(b'\r', "")
2622 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002623
Antoine Pitrou180a3362008-12-14 16:36:46 +00002624 _check_decode(b'\r\r\n', "\n\n")
2625 _check_decode(b'\r', "")
2626 _check_decode(b'\r', "\n")
2627 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002628
Antoine Pitrou180a3362008-12-14 16:36:46 +00002629 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2630 _check_decode(b'\xe8\xa2\x88', "\u8888")
2631 _check_decode(b'\n', "\n")
2632 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2633 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002635 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002636 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 if encoding is not None:
2638 encoder = codecs.getincrementalencoder(encoding)()
2639 def _decode_bytewise(s):
2640 # Decode one byte at a time
2641 for b in encoder.encode(s):
2642 result.append(decoder.decode(bytes([b])))
2643 else:
2644 encoder = None
2645 def _decode_bytewise(s):
2646 # Decode one char at a time
2647 for c in s:
2648 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002649 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002650 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002651 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002652 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002653 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002654 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002655 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002656 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002657 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002658 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002659 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002660 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 input = "abc"
2662 if encoder is not None:
2663 encoder.reset()
2664 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002665 self.assertEqual(decoder.decode(input), "abc")
2666 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002667
2668 def test_newline_decoder(self):
2669 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002670 # None meaning the IncrementalNewlineDecoder takes unicode input
2671 # rather than bytes input
2672 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002673 'utf-16', 'utf-16-le', 'utf-16-be',
2674 'utf-32', 'utf-32-le', 'utf-32-be',
2675 )
2676 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 decoder = enc and codecs.getincrementaldecoder(enc)()
2678 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2679 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002680 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2682 self.check_newline_decoding_utf8(decoder)
2683
Antoine Pitrou66913e22009-03-06 23:40:56 +00002684 def test_newline_bytes(self):
2685 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2686 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002687 self.assertEqual(dec.newlines, None)
2688 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2689 self.assertEqual(dec.newlines, None)
2690 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2691 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002692 dec = self.IncrementalNewlineDecoder(None, translate=False)
2693 _check(dec)
2694 dec = self.IncrementalNewlineDecoder(None, translate=True)
2695 _check(dec)
2696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002697class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2698 pass
2699
2700class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2701 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002702
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002703
Guido van Rossum01a27522007-03-07 01:00:12 +00002704# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002705
Guido van Rossum5abbf752007-08-27 17:39:33 +00002706class MiscIOTest(unittest.TestCase):
2707
Barry Warsaw40e82462008-11-20 20:14:50 +00002708 def tearDown(self):
2709 support.unlink(support.TESTFN)
2710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002711 def test___all__(self):
2712 for name in self.io.__all__:
2713 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002714 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002715 if name == "open":
2716 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002717 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002718 self.assertTrue(issubclass(obj, Exception), name)
2719 elif not name.startswith("SEEK_"):
2720 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002721
Barry Warsaw40e82462008-11-20 20:14:50 +00002722 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002724 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002725 f.close()
2726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002728 self.assertEqual(f.name, support.TESTFN)
2729 self.assertEqual(f.buffer.name, support.TESTFN)
2730 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2731 self.assertEqual(f.mode, "U")
2732 self.assertEqual(f.buffer.mode, "rb")
2733 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002734 f.close()
2735
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002736 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(f.mode, "w+")
2738 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2739 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002741 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002742 self.assertEqual(g.mode, "wb")
2743 self.assertEqual(g.raw.mode, "wb")
2744 self.assertEqual(g.name, f.fileno())
2745 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002746 f.close()
2747 g.close()
2748
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002749 def test_io_after_close(self):
2750 for kwargs in [
2751 {"mode": "w"},
2752 {"mode": "wb"},
2753 {"mode": "w", "buffering": 1},
2754 {"mode": "w", "buffering": 2},
2755 {"mode": "wb", "buffering": 0},
2756 {"mode": "r"},
2757 {"mode": "rb"},
2758 {"mode": "r", "buffering": 1},
2759 {"mode": "r", "buffering": 2},
2760 {"mode": "rb", "buffering": 0},
2761 {"mode": "w+"},
2762 {"mode": "w+b"},
2763 {"mode": "w+", "buffering": 1},
2764 {"mode": "w+", "buffering": 2},
2765 {"mode": "w+b", "buffering": 0},
2766 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002767 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002768 f.close()
2769 self.assertRaises(ValueError, f.flush)
2770 self.assertRaises(ValueError, f.fileno)
2771 self.assertRaises(ValueError, f.isatty)
2772 self.assertRaises(ValueError, f.__iter__)
2773 if hasattr(f, "peek"):
2774 self.assertRaises(ValueError, f.peek, 1)
2775 self.assertRaises(ValueError, f.read)
2776 if hasattr(f, "read1"):
2777 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002778 if hasattr(f, "readall"):
2779 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002780 if hasattr(f, "readinto"):
2781 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2782 self.assertRaises(ValueError, f.readline)
2783 self.assertRaises(ValueError, f.readlines)
2784 self.assertRaises(ValueError, f.seek, 0)
2785 self.assertRaises(ValueError, f.tell)
2786 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002787 self.assertRaises(ValueError, f.write,
2788 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002789 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002791
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002792 def test_blockingioerror(self):
2793 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002794 class C(str):
2795 pass
2796 c = C("")
2797 b = self.BlockingIOError(1, c)
2798 c.b = b
2799 b.c = c
2800 wr = weakref.ref(c)
2801 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002802 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002803 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804
2805 def test_abcs(self):
2806 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002807 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2808 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2809 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2810 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002811
2812 def _check_abc_inheritance(self, abcmodule):
2813 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002814 self.assertIsInstance(f, abcmodule.IOBase)
2815 self.assertIsInstance(f, abcmodule.RawIOBase)
2816 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2817 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002818 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002819 self.assertIsInstance(f, abcmodule.IOBase)
2820 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2821 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2822 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002823 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002824 self.assertIsInstance(f, abcmodule.IOBase)
2825 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2826 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2827 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828
2829 def test_abc_inheritance(self):
2830 # Test implementations inherit from their respective ABCs
2831 self._check_abc_inheritance(self)
2832
2833 def test_abc_inheritance_official(self):
2834 # Test implementations inherit from the official ABCs of the
2835 # baseline "io" module.
2836 self._check_abc_inheritance(io)
2837
Antoine Pitroue033e062010-10-29 10:38:18 +00002838 def _check_warn_on_dealloc(self, *args, **kwargs):
2839 f = open(*args, **kwargs)
2840 r = repr(f)
2841 with self.assertWarns(ResourceWarning) as cm:
2842 f = None
2843 support.gc_collect()
2844 self.assertIn(r, str(cm.warning.args[0]))
2845
2846 def test_warn_on_dealloc(self):
2847 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2848 self._check_warn_on_dealloc(support.TESTFN, "wb")
2849 self._check_warn_on_dealloc(support.TESTFN, "w")
2850
2851 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2852 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002853 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002854 for fd in fds:
2855 try:
2856 os.close(fd)
2857 except EnvironmentError as e:
2858 if e.errno != errno.EBADF:
2859 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002860 self.addCleanup(cleanup_fds)
2861 r, w = os.pipe()
2862 fds += r, w
2863 self._check_warn_on_dealloc(r, *args, **kwargs)
2864 # When using closefd=False, there's no warning
2865 r, w = os.pipe()
2866 fds += r, w
2867 with warnings.catch_warnings(record=True) as recorded:
2868 open(r, *args, closefd=False, **kwargs)
2869 support.gc_collect()
2870 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002871
2872 def test_warn_on_dealloc_fd(self):
2873 self._check_warn_on_dealloc_fd("rb", buffering=0)
2874 self._check_warn_on_dealloc_fd("rb")
2875 self._check_warn_on_dealloc_fd("r")
2876
2877
Antoine Pitrou243757e2010-11-05 21:15:39 +00002878 def test_pickling(self):
2879 # Pickling file objects is forbidden
2880 for kwargs in [
2881 {"mode": "w"},
2882 {"mode": "wb"},
2883 {"mode": "wb", "buffering": 0},
2884 {"mode": "r"},
2885 {"mode": "rb"},
2886 {"mode": "rb", "buffering": 0},
2887 {"mode": "w+"},
2888 {"mode": "w+b"},
2889 {"mode": "w+b", "buffering": 0},
2890 ]:
2891 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2892 with self.open(support.TESTFN, **kwargs) as f:
2893 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2894
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002895 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2896 def test_nonblock_pipe_write_bigbuf(self):
2897 self._test_nonblock_pipe_write(16*1024)
2898
2899 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2900 def test_nonblock_pipe_write_smallbuf(self):
2901 self._test_nonblock_pipe_write(1024)
2902
2903 def _set_non_blocking(self, fd):
2904 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2905 self.assertNotEqual(flags, -1)
2906 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2907 self.assertEqual(res, 0)
2908
2909 def _test_nonblock_pipe_write(self, bufsize):
2910 sent = []
2911 received = []
2912 r, w = os.pipe()
2913 self._set_non_blocking(r)
2914 self._set_non_blocking(w)
2915
2916 # To exercise all code paths in the C implementation we need
2917 # to play with buffer sizes. For instance, if we choose a
2918 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2919 # then we will never get a partial write of the buffer.
2920 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2921 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2922
2923 with rf, wf:
2924 for N in 9999, 73, 7574:
2925 try:
2926 i = 0
2927 while True:
2928 msg = bytes([i % 26 + 97]) * N
2929 sent.append(msg)
2930 wf.write(msg)
2931 i += 1
2932
2933 except self.BlockingIOError as e:
2934 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002935 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002936 sent[-1] = sent[-1][:e.characters_written]
2937 received.append(rf.read())
2938 msg = b'BLOCKED'
2939 wf.write(msg)
2940 sent.append(msg)
2941
2942 while True:
2943 try:
2944 wf.flush()
2945 break
2946 except self.BlockingIOError as e:
2947 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002948 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002949 self.assertEqual(e.characters_written, 0)
2950 received.append(rf.read())
2951
2952 received += iter(rf.read, None)
2953
2954 sent, received = b''.join(sent), b''.join(received)
2955 self.assertTrue(sent == received)
2956 self.assertTrue(wf.closed)
2957 self.assertTrue(rf.closed)
2958
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002959 def test_create_fail(self):
2960 # 'x' mode fails if file is existing
2961 with self.open(support.TESTFN, 'w'):
2962 pass
2963 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2964
2965 def test_create_writes(self):
2966 # 'x' mode opens for writing
2967 with self.open(support.TESTFN, 'xb') as f:
2968 f.write(b"spam")
2969 with self.open(support.TESTFN, 'rb') as f:
2970 self.assertEqual(b"spam", f.read())
2971
Christian Heimes7b648752012-09-10 14:48:43 +02002972 def test_open_allargs(self):
2973 # there used to be a buffer overflow in the parser for rawmode
2974 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2975
2976
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002977class CMiscIOTest(MiscIOTest):
2978 io = io
2979
2980class PyMiscIOTest(MiscIOTest):
2981 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002982
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002983
2984@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2985class SignalsTest(unittest.TestCase):
2986
2987 def setUp(self):
2988 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2989
2990 def tearDown(self):
2991 signal.signal(signal.SIGALRM, self.oldalrm)
2992
2993 def alarm_interrupt(self, sig, frame):
2994 1/0
2995
2996 @unittest.skipUnless(threading, 'Threading required for this test.')
2997 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2998 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002999 invokes the signal handler, and bubbles up the exception raised
3000 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003001 read_results = []
3002 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003003 if hasattr(signal, 'pthread_sigmask'):
3004 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003005 s = os.read(r, 1)
3006 read_results.append(s)
3007 t = threading.Thread(target=_read)
3008 t.daemon = True
3009 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003010 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003011 try:
3012 wio = self.io.open(w, **fdopen_kwargs)
3013 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003014 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003015 # Fill the pipe enough that the write will be blocking.
3016 # It will be interrupted by the timer armed above. Since the
3017 # other thread has read one byte, the low-level write will
3018 # return with a successful (partial) result rather than an EINTR.
3019 # The buffered IO layer must check for pending signal
3020 # handlers, which in this case will invoke alarm_interrupt().
3021 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02003022 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003023 t.join()
3024 # We got one byte, get another one and check that it isn't a
3025 # repeat of the first one.
3026 read_results.append(os.read(r, 1))
3027 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3028 finally:
3029 os.close(w)
3030 os.close(r)
3031 # This is deliberate. If we didn't close the file descriptor
3032 # before closing wio, wio would try to flush its internal
3033 # buffer, and block again.
3034 try:
3035 wio.close()
3036 except IOError as e:
3037 if e.errno != errno.EBADF:
3038 raise
3039
3040 def test_interrupted_write_unbuffered(self):
3041 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3042
3043 def test_interrupted_write_buffered(self):
3044 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3045
3046 def test_interrupted_write_text(self):
3047 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3048
Brett Cannon31f59292011-02-21 19:29:56 +00003049 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003050 def check_reentrant_write(self, data, **fdopen_kwargs):
3051 def on_alarm(*args):
3052 # Will be called reentrantly from the same thread
3053 wio.write(data)
3054 1/0
3055 signal.signal(signal.SIGALRM, on_alarm)
3056 r, w = os.pipe()
3057 wio = self.io.open(w, **fdopen_kwargs)
3058 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003059 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003060 # Either the reentrant call to wio.write() fails with RuntimeError,
3061 # or the signal handler raises ZeroDivisionError.
3062 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3063 while 1:
3064 for i in range(100):
3065 wio.write(data)
3066 wio.flush()
3067 # Make sure the buffer doesn't fill up and block further writes
3068 os.read(r, len(data) * 100)
3069 exc = cm.exception
3070 if isinstance(exc, RuntimeError):
3071 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3072 finally:
3073 wio.close()
3074 os.close(r)
3075
3076 def test_reentrant_write_buffered(self):
3077 self.check_reentrant_write(b"xy", mode="wb")
3078
3079 def test_reentrant_write_text(self):
3080 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3081
Antoine Pitrou707ce822011-02-25 21:24:11 +00003082 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3083 """Check that a buffered read, when it gets interrupted (either
3084 returning a partial result or EINTR), properly invokes the signal
3085 handler and retries if the latter returned successfully."""
3086 r, w = os.pipe()
3087 fdopen_kwargs["closefd"] = False
3088 def alarm_handler(sig, frame):
3089 os.write(w, b"bar")
3090 signal.signal(signal.SIGALRM, alarm_handler)
3091 try:
3092 rio = self.io.open(r, **fdopen_kwargs)
3093 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003094 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003095 # Expected behaviour:
3096 # - first raw read() returns partial b"foo"
3097 # - second raw read() returns EINTR
3098 # - third raw read() returns b"bar"
3099 self.assertEqual(decode(rio.read(6)), "foobar")
3100 finally:
3101 rio.close()
3102 os.close(w)
3103 os.close(r)
3104
Antoine Pitrou20db5112011-08-19 20:32:34 +02003105 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003106 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3107 mode="rb")
3108
Antoine Pitrou20db5112011-08-19 20:32:34 +02003109 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003110 self.check_interrupted_read_retry(lambda x: x,
3111 mode="r")
3112
3113 @unittest.skipUnless(threading, 'Threading required for this test.')
3114 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3115 """Check that a buffered write, when it gets interrupted (either
3116 returning a partial result or EINTR), properly invokes the signal
3117 handler and retries if the latter returned successfully."""
3118 select = support.import_module("select")
3119 # A quantity that exceeds the buffer size of an anonymous pipe's
3120 # write end.
3121 N = 1024 * 1024
3122 r, w = os.pipe()
3123 fdopen_kwargs["closefd"] = False
3124 # We need a separate thread to read from the pipe and allow the
3125 # write() to finish. This thread is started after the SIGALRM is
3126 # received (forcing a first EINTR in write()).
3127 read_results = []
3128 write_finished = False
3129 def _read():
3130 while not write_finished:
3131 while r in select.select([r], [], [], 1.0)[0]:
3132 s = os.read(r, 1024)
3133 read_results.append(s)
3134 t = threading.Thread(target=_read)
3135 t.daemon = True
3136 def alarm1(sig, frame):
3137 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003138 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003139 def alarm2(sig, frame):
3140 t.start()
3141 signal.signal(signal.SIGALRM, alarm1)
3142 try:
3143 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003144 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003145 # Expected behaviour:
3146 # - first raw write() is partial (because of the limited pipe buffer
3147 # and the first alarm)
3148 # - second raw write() returns EINTR (because of the second alarm)
3149 # - subsequent write()s are successful (either partial or complete)
3150 self.assertEqual(N, wio.write(item * N))
3151 wio.flush()
3152 write_finished = True
3153 t.join()
3154 self.assertEqual(N, sum(len(x) for x in read_results))
3155 finally:
3156 write_finished = True
3157 os.close(w)
3158 os.close(r)
3159 # This is deliberate. If we didn't close the file descriptor
3160 # before closing wio, wio would try to flush its internal
3161 # buffer, and could block (in case of failure).
3162 try:
3163 wio.close()
3164 except IOError as e:
3165 if e.errno != errno.EBADF:
3166 raise
3167
Antoine Pitrou20db5112011-08-19 20:32:34 +02003168 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003169 self.check_interrupted_write_retry(b"x", mode="wb")
3170
Antoine Pitrou20db5112011-08-19 20:32:34 +02003171 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003172 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3173
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003174
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003175class CSignalsTest(SignalsTest):
3176 io = io
3177
3178class PySignalsTest(SignalsTest):
3179 io = pyio
3180
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003181 # Handling reentrancy issues would slow down _pyio even more, so the
3182 # tests are disabled.
3183 test_reentrant_write_buffered = None
3184 test_reentrant_write_text = None
3185
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003186
Guido van Rossum28524c72007-02-27 05:47:44 +00003187def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003188 tests = (CIOTest, PyIOTest,
3189 CBufferedReaderTest, PyBufferedReaderTest,
3190 CBufferedWriterTest, PyBufferedWriterTest,
3191 CBufferedRWPairTest, PyBufferedRWPairTest,
3192 CBufferedRandomTest, PyBufferedRandomTest,
3193 StatefulIncrementalDecoderTest,
3194 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3195 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003196 CMiscIOTest, PyMiscIOTest,
3197 CSignalsTest, PySignalsTest,
3198 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003199
3200 # Put the namespaces of the IO module we are testing and some useful mock
3201 # classes in the __dict__ of each test.
3202 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003203 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003204 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3205 c_io_ns = {name : getattr(io, name) for name in all_members}
3206 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3207 globs = globals()
3208 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3209 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3210 # Avoid turning open into a bound method.
3211 py_io_ns["open"] = pyio.OpenWrapper
3212 for test in tests:
3213 if test.__name__.startswith("C"):
3214 for name, obj in c_io_ns.items():
3215 setattr(test, name, obj)
3216 elif test.__name__.startswith("Py"):
3217 for name, obj in py_io_ns.items():
3218 setattr(test, name, obj)
3219
3220 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003221
3222if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003223 test_main()