blob: 57b22c6d05546df155fb62357b23af15f5b77e05 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Serhiy Storchaka441d30f2013-01-19 12:26:26 +020035import _testcapi
Antoine Pitrou131a4892012-10-16 22:57:11 +020036from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000040import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000041import io # C implementation of io
42import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000043try:
44 import threading
45except ImportError:
46 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010047try:
48 import fcntl
49except ImportError:
50 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000052def _default_chunk_size():
53 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000054 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000055 return f._CHUNK_SIZE
56
57
Antoine Pitrou328ec742010-09-14 18:37:24 +000058class MockRawIOWithoutRead:
59 """A RawIO implementation without read(), so as to exercise the default
60 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000062 def __init__(self, read_stack=()):
63 self._read_stack = list(read_stack)
64 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000065 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000066 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000069 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000070 return len(b)
71
72 def writable(self):
73 return True
74
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075 def fileno(self):
76 return 42
77
78 def readable(self):
79 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000082 return True
83
Guido van Rossum01a27522007-03-07 01:00:12 +000084 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000086
87 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return 0 # same comment as above
89
90 def readinto(self, buf):
91 self._reads += 1
92 max_len = len(buf)
93 try:
94 data = self._read_stack[0]
95 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000096 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000097 return 0
98 if data is None:
99 del self._read_stack[0]
100 return None
101 n = len(data)
102 if len(data) <= max_len:
103 del self._read_stack[0]
104 buf[:n] = data
105 return n
106 else:
107 buf[:] = data[:max_len]
108 self._read_stack[0] = data[max_len:]
109 return max_len
110
111 def truncate(self, pos=None):
112 return pos
113
Antoine Pitrou328ec742010-09-14 18:37:24 +0000114class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
115 pass
116
117class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
118 pass
119
120
121class MockRawIO(MockRawIOWithoutRead):
122
123 def read(self, n=None):
124 self._reads += 1
125 try:
126 return self._read_stack.pop(0)
127 except:
128 self._extraneous_reads += 1
129 return b""
130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000131class CMockRawIO(MockRawIO, io.RawIOBase):
132 pass
133
134class PyMockRawIO(MockRawIO, pyio.RawIOBase):
135 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000136
Guido van Rossuma9e20242007-03-08 00:43:48 +0000137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000138class MisbehavedRawIO(MockRawIO):
139 def write(self, b):
140 return super().write(b) * 2
141
142 def read(self, n=None):
143 return super().read(n) * 2
144
145 def seek(self, pos, whence):
146 return -123
147
148 def tell(self):
149 return -456
150
151 def readinto(self, buf):
152 super().readinto(buf)
153 return len(buf) * 5
154
155class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
156 pass
157
158class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
159 pass
160
161
162class CloseFailureIO(MockRawIO):
163 closed = 0
164
165 def close(self):
166 if not self.closed:
167 self.closed = 1
168 raise IOError
169
170class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
171 pass
172
173class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
174 pass
175
176
177class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000178
179 def __init__(self, data):
180 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000181 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000182
183 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 self.read_history.append(None if res is None else len(res))
186 return res
187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188 def readinto(self, b):
189 res = super().readinto(b)
190 self.read_history.append(res)
191 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000193class CMockFileIO(MockFileIO, io.BytesIO):
194 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000195
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000196class PyMockFileIO(MockFileIO, pyio.BytesIO):
197 pass
198
199
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000200class MockUnseekableIO:
201 def seekable(self):
202 return False
203
204 def seek(self, *args):
205 raise self.UnsupportedOperation("not seekable")
206
207 def tell(self, *args):
208 raise self.UnsupportedOperation("not seekable")
209
210class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
211 UnsupportedOperation = io.UnsupportedOperation
212
213class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
214 UnsupportedOperation = pyio.UnsupportedOperation
215
216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000217class MockNonBlockWriterIO:
218
219 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000220 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 def pop_written(self):
224 s = b"".join(self._write_stack)
225 self._write_stack[:] = []
226 return s
227
228 def block_on(self, char):
229 """Block when a given char is encountered."""
230 self._blocker_char = char
231
232 def readable(self):
233 return True
234
235 def seekable(self):
236 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 def writable(self):
239 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241 def write(self, b):
242 b = bytes(b)
243 n = -1
244 if self._blocker_char:
245 try:
246 n = b.index(self._blocker_char)
247 except ValueError:
248 pass
249 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100250 if n > 0:
251 # write data up to the first blocker
252 self._write_stack.append(b[:n])
253 return n
254 else:
255 # cancel blocker and indicate would block
256 self._blocker_char = None
257 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000258 self._write_stack.append(b)
259 return len(b)
260
261class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
262 BlockingIOError = io.BlockingIOError
263
264class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
265 BlockingIOError = pyio.BlockingIOError
266
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum28524c72007-02-27 05:47:44 +0000268class IOTest(unittest.TestCase):
269
Neal Norwitze7789b12008-03-24 06:18:09 +0000270 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000272
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000274 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000275
Guido van Rossum28524c72007-02-27 05:47:44 +0000276 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000277 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000278 f.truncate(0)
279 self.assertEqual(f.tell(), 5)
280 f.seek(0)
281
282 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000283 self.assertEqual(f.seek(0), 0)
284 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000286 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000288 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000290 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.seek(-1, 2), 13)
292 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000295 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000296 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000297
Guido van Rossum9b76da62007-04-11 01:09:03 +0000298 def read_ops(self, f, buffered=False):
299 data = f.read(5)
300 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000301 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000302 self.assertEqual(f.readinto(data), 5)
303 self.assertEqual(data, b" worl")
304 self.assertEqual(f.readinto(data), 2)
305 self.assertEqual(len(data), 5)
306 self.assertEqual(data[:2], b"d\n")
307 self.assertEqual(f.seek(0), 0)
308 self.assertEqual(f.read(20), b"hello world\n")
309 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000310 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000311 self.assertEqual(f.seek(-6, 2), 6)
312 self.assertEqual(f.read(5), b"world")
313 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000314 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000315 self.assertEqual(f.seek(-6, 1), 5)
316 self.assertEqual(f.read(5), b" worl")
317 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000318 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000319 if buffered:
320 f.seek(0)
321 self.assertEqual(f.read(), b"hello world\n")
322 f.seek(6)
323 self.assertEqual(f.read(), b"world\n")
324 self.assertEqual(f.read(), b"")
325
Guido van Rossum34d69e52007-04-10 20:08:41 +0000326 LARGE = 2**31
327
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 def large_file_ops(self, f):
329 assert f.readable()
330 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(self.LARGE), self.LARGE)
332 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000334 self.assertEqual(f.tell(), self.LARGE + 3)
335 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000336 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000337 self.assertEqual(f.tell(), self.LARGE + 2)
338 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000340 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000341 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
342 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000343 self.assertEqual(f.read(2), b"x")
344
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000345 def test_invalid_operations(self):
346 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000347 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000348 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000349 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000350 self.assertRaises(exc, fp.read)
351 self.assertRaises(exc, fp.readline)
352 with self.open(support.TESTFN, "wb", buffering=0) as fp:
353 self.assertRaises(exc, fp.read)
354 self.assertRaises(exc, fp.readline)
355 with self.open(support.TESTFN, "rb", buffering=0) as fp:
356 self.assertRaises(exc, fp.write, b"blah")
357 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000358 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000359 self.assertRaises(exc, fp.write, b"blah")
360 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000361 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000362 self.assertRaises(exc, fp.write, "blah")
363 self.assertRaises(exc, fp.writelines, ["blah\n"])
364 # Non-zero seeking from current or end pos
365 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
366 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000367
Antoine Pitrou13348842012-01-29 18:36:34 +0100368 def test_open_handles_NUL_chars(self):
369 fn_with_NUL = 'foo\0bar'
370 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
371 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
372
Guido van Rossum28524c72007-02-27 05:47:44 +0000373 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000374 with self.open(support.TESTFN, "wb", buffering=0) as f:
375 self.assertEqual(f.readable(), False)
376 self.assertEqual(f.writable(), True)
377 self.assertEqual(f.seekable(), True)
378 self.write_ops(f)
379 with self.open(support.TESTFN, "rb", buffering=0) as f:
380 self.assertEqual(f.readable(), True)
381 self.assertEqual(f.writable(), False)
382 self.assertEqual(f.seekable(), True)
383 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000384
Guido van Rossum87429772007-04-10 21:06:59 +0000385 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000386 with self.open(support.TESTFN, "wb") as f:
387 self.assertEqual(f.readable(), False)
388 self.assertEqual(f.writable(), True)
389 self.assertEqual(f.seekable(), True)
390 self.write_ops(f)
391 with self.open(support.TESTFN, "rb") as f:
392 self.assertEqual(f.readable(), True)
393 self.assertEqual(f.writable(), False)
394 self.assertEqual(f.seekable(), True)
395 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000396
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000397 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000398 with self.open(support.TESTFN, "wb") as f:
399 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
400 with self.open(support.TESTFN, "rb") as f:
401 self.assertEqual(f.readline(), b"abc\n")
402 self.assertEqual(f.readline(10), b"def\n")
403 self.assertEqual(f.readline(2), b"xy")
404 self.assertEqual(f.readline(4), b"zzy\n")
405 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000406 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000407 self.assertRaises(TypeError, f.readline, 5.3)
408 with self.open(support.TESTFN, "r") as f:
409 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000410
Guido van Rossum28524c72007-02-27 05:47:44 +0000411 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000413 self.write_ops(f)
414 data = f.getvalue()
415 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000417 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000418
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000420 # On Windows and Mac OSX this test comsumes large resources; It takes
421 # a long time to build the >2GB file and takes >2GB of disk space
422 # therefore the resource must be enabled to run this test.
423 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000424 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000425 print("\nTesting large file ops skipped on %s." % sys.platform,
426 file=sys.stderr)
427 print("It requires %d bytes and a long time." % self.LARGE,
428 file=sys.stderr)
429 print("Use 'regrtest.py -u largefile test_io' to run it.",
430 file=sys.stderr)
431 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000432 with self.open(support.TESTFN, "w+b", 0) as f:
433 self.large_file_ops(f)
434 with self.open(support.TESTFN, "w+b") as f:
435 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000436
437 def test_with_open(self):
438 for bufsize in (0, 1, 100):
439 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000440 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000441 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000442 self.assertEqual(f.closed, True)
443 f = None
444 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000445 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000446 1/0
447 except ZeroDivisionError:
448 self.assertEqual(f.closed, True)
449 else:
450 self.fail("1/0 didn't raise an exception")
451
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 # issue 5008
453 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000456 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000457 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000459 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000460 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000461 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_destructor(self):
464 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000466 def __del__(self):
467 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000468 try:
469 f = super().__del__
470 except AttributeError:
471 pass
472 else:
473 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000474 def close(self):
475 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000477 def flush(self):
478 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000479 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000480 with support.check_warnings(('', ResourceWarning)):
481 f = MyFileIO(support.TESTFN, "wb")
482 f.write(b"xxx")
483 del f
484 support.gc_collect()
485 self.assertEqual(record, [1, 2, 3])
486 with self.open(support.TESTFN, "rb") as f:
487 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488
489 def _check_base_destructor(self, base):
490 record = []
491 class MyIO(base):
492 def __init__(self):
493 # This exercises the availability of attributes on object
494 # destruction.
495 # (in the C version, close() is called by the tp_dealloc
496 # function, not by __del__)
497 self.on_del = 1
498 self.on_close = 2
499 self.on_flush = 3
500 def __del__(self):
501 record.append(self.on_del)
502 try:
503 f = super().__del__
504 except AttributeError:
505 pass
506 else:
507 f()
508 def close(self):
509 record.append(self.on_close)
510 super().close()
511 def flush(self):
512 record.append(self.on_flush)
513 super().flush()
514 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000515 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000516 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000517 self.assertEqual(record, [1, 2, 3])
518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000519 def test_IOBase_destructor(self):
520 self._check_base_destructor(self.IOBase)
521
522 def test_RawIOBase_destructor(self):
523 self._check_base_destructor(self.RawIOBase)
524
525 def test_BufferedIOBase_destructor(self):
526 self._check_base_destructor(self.BufferedIOBase)
527
528 def test_TextIOBase_destructor(self):
529 self._check_base_destructor(self.TextIOBase)
530
Guido van Rossum87429772007-04-10 21:06:59 +0000531 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000532 with self.open(support.TESTFN, "wb") as f:
533 f.write(b"xxx")
534 with self.open(support.TESTFN, "rb") as f:
535 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossumd4103952007-04-12 05:44:49 +0000537 def test_array_writes(self):
538 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000539 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", 0) as f:
541 self.assertEqual(f.write(a), n)
542 with self.open(support.TESTFN, "wb") as f:
543 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000544
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000545 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000547 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 def test_read_closed(self):
550 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000551 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 with self.open(support.TESTFN, "r") as f:
553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.read(), "egg\n")
555 file.seek(0)
556 file.close()
557 self.assertRaises(ValueError, file.read)
558
559 def test_no_closefd_with_filename(self):
560 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562
563 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000565 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000566 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000567 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000569 self.assertEqual(file.buffer.raw.closefd, False)
570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000571 def test_garbage_collection(self):
572 # FileIO objects are collected, and collecting them flushes
573 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000574 with support.check_warnings(('', ResourceWarning)):
575 f = self.FileIO(support.TESTFN, "wb")
576 f.write(b"abcxxx")
577 f.f = f
578 wr = weakref.ref(f)
579 del f
580 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000581 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000582 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000583 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000584
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 def test_unbounded_file(self):
586 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
587 zero = "/dev/zero"
588 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000589 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000591 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000593 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000594 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000595 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000596 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000597 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000598 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000599 self.assertRaises(OverflowError, f.read)
600
Antoine Pitrou6be88762010-05-03 16:48:20 +0000601 def test_flush_error_on_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 def bad_flush():
604 raise IOError()
605 f.flush = bad_flush
606 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600607 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000608
609 def test_multi_close(self):
610 f = self.open(support.TESTFN, "wb", buffering=0)
611 f.close()
612 f.close()
613 f.close()
614 self.assertRaises(ValueError, f.flush)
615
Antoine Pitrou328ec742010-09-14 18:37:24 +0000616 def test_RawIOBase_read(self):
617 # Exercise the default RawIOBase.read() implementation (which calls
618 # readinto() internally).
619 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
620 self.assertEqual(rawio.read(2), b"ab")
621 self.assertEqual(rawio.read(2), b"c")
622 self.assertEqual(rawio.read(2), b"d")
623 self.assertEqual(rawio.read(2), None)
624 self.assertEqual(rawio.read(2), b"ef")
625 self.assertEqual(rawio.read(2), b"g")
626 self.assertEqual(rawio.read(2), None)
627 self.assertEqual(rawio.read(2), b"")
628
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400629 def test_types_have_dict(self):
630 test = (
631 self.IOBase(),
632 self.RawIOBase(),
633 self.TextIOBase(),
634 self.StringIO(),
635 self.BytesIO()
636 )
637 for obj in test:
638 self.assertTrue(hasattr(obj, "__dict__"))
639
Ross Lagerwall59142db2011-10-31 20:34:46 +0200640 def test_opener(self):
641 with self.open(support.TESTFN, "w") as f:
642 f.write("egg\n")
643 fd = os.open(support.TESTFN, os.O_RDONLY)
644 def opener(path, flags):
645 return fd
646 with self.open("non-existent", "r", opener=opener) as f:
647 self.assertEqual(f.read(), "egg\n")
648
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200649 def test_fileio_closefd(self):
650 # Issue #4841
651 with self.open(__file__, 'rb') as f1, \
652 self.open(__file__, 'rb') as f2:
653 fileio = self.FileIO(f1.fileno(), closefd=False)
654 # .__init__() must not close f1
655 fileio.__init__(f2.fileno(), closefd=False)
656 f1.readline()
657 # .close() must not close f2
658 fileio.close()
659 f2.readline()
660
661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200663
664 def test_IOBase_finalize(self):
665 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
666 # class which inherits IOBase and an object of this class are caught
667 # in a reference cycle and close() is already in the method cache.
668 class MyIO(self.IOBase):
669 def close(self):
670 pass
671
672 # create an instance to populate the method cache
673 MyIO()
674 obj = MyIO()
675 obj.obj = obj
676 wr = weakref.ref(obj)
677 del MyIO
678 del obj
679 support.gc_collect()
680 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682class PyIOTest(IOTest):
683 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000684
Guido van Rossuma9e20242007-03-08 00:43:48 +0000685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000686class CommonBufferedTests:
687 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
688
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000689 def test_detach(self):
690 raw = self.MockRawIO()
691 buf = self.tp(raw)
692 self.assertIs(buf.detach(), raw)
693 self.assertRaises(ValueError, buf.detach)
694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000695 def test_fileno(self):
696 rawio = self.MockRawIO()
697 bufio = self.tp(rawio)
698
Ezio Melottib3aedd42010-11-20 19:04:17 +0000699 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700
701 def test_no_fileno(self):
702 # XXX will we always have fileno() function? If so, kill
703 # this test. Else, write it.
704 pass
705
706 def test_invalid_args(self):
707 rawio = self.MockRawIO()
708 bufio = self.tp(rawio)
709 # Invalid whence
710 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200711 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712
713 def test_override_destructor(self):
714 tp = self.tp
715 record = []
716 class MyBufferedIO(tp):
717 def __del__(self):
718 record.append(1)
719 try:
720 f = super().__del__
721 except AttributeError:
722 pass
723 else:
724 f()
725 def close(self):
726 record.append(2)
727 super().close()
728 def flush(self):
729 record.append(3)
730 super().flush()
731 rawio = self.MockRawIO()
732 bufio = MyBufferedIO(rawio)
733 writable = bufio.writable()
734 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000735 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000736 if writable:
737 self.assertEqual(record, [1, 2, 3])
738 else:
739 self.assertEqual(record, [1, 2])
740
741 def test_context_manager(self):
742 # Test usability as a context manager
743 rawio = self.MockRawIO()
744 bufio = self.tp(rawio)
745 def _with():
746 with bufio:
747 pass
748 _with()
749 # bufio should now be closed, and using it a second time should raise
750 # a ValueError.
751 self.assertRaises(ValueError, _with)
752
753 def test_error_through_destructor(self):
754 # Test that the exception state is not modified by a destructor,
755 # even if close() fails.
756 rawio = self.CloseFailureIO()
757 def f():
758 self.tp(rawio).xyzzy
759 with support.captured_output("stderr") as s:
760 self.assertRaises(AttributeError, f)
761 s = s.getvalue().strip()
762 if s:
763 # The destructor *may* have printed an unraisable error, check it
764 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000765 self.assertTrue(s.startswith("Exception IOError: "), s)
766 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
Antoine Pitrou716c4442009-05-23 19:04:03 +0000768 def test_repr(self):
769 raw = self.MockRawIO()
770 b = self.tp(raw)
771 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
772 self.assertEqual(repr(b), "<%s>" % clsname)
773 raw.name = "dummy"
774 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
775 raw.name = b"dummy"
776 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
777
Antoine Pitrou6be88762010-05-03 16:48:20 +0000778 def test_flush_error_on_close(self):
779 raw = self.MockRawIO()
780 def bad_flush():
781 raise IOError()
782 raw.flush = bad_flush
783 b = self.tp(raw)
784 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600785 self.assertTrue(b.closed)
786
787 def test_close_error_on_close(self):
788 raw = self.MockRawIO()
789 def bad_flush():
790 raise IOError('flush')
791 def bad_close():
792 raise IOError('close')
793 raw.close = bad_close
794 b = self.tp(raw)
795 b.flush = bad_flush
796 with self.assertRaises(IOError) as err: # exception not swallowed
797 b.close()
798 self.assertEqual(err.exception.args, ('close',))
799 self.assertEqual(err.exception.__context__.args, ('flush',))
800 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000801
802 def test_multi_close(self):
803 raw = self.MockRawIO()
804 b = self.tp(raw)
805 b.close()
806 b.close()
807 b.close()
808 self.assertRaises(ValueError, b.flush)
809
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000810 def test_unseekable(self):
811 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
812 self.assertRaises(self.UnsupportedOperation, bufio.tell)
813 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
814
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000815 def test_readonly_attributes(self):
816 raw = self.MockRawIO()
817 buf = self.tp(raw)
818 x = self.MockRawIO()
819 with self.assertRaises(AttributeError):
820 buf.raw = x
821
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200823class SizeofTest:
824
825 @support.cpython_only
826 def test_sizeof(self):
827 bufsize1 = 4096
828 bufsize2 = 8192
829 rawio = self.MockRawIO()
830 bufio = self.tp(rawio, buffer_size=bufsize1)
831 size = sys.getsizeof(bufio) - bufsize1
832 rawio = self.MockRawIO()
833 bufio = self.tp(rawio, buffer_size=bufsize2)
834 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
835
836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
838 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_constructor(self):
841 rawio = self.MockRawIO([b"abc"])
842 bufio = self.tp(rawio)
843 bufio.__init__(rawio)
844 bufio.__init__(rawio, buffer_size=1024)
845 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000846 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
849 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
850 rawio = self.MockRawIO([b"abc"])
851 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000852 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000855 for arg in (None, 7):
856 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
857 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 # Invalid args
860 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_read1(self):
863 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
864 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000865 self.assertEqual(b"a", bufio.read(1))
866 self.assertEqual(b"b", bufio.read1(1))
867 self.assertEqual(rawio._reads, 1)
868 self.assertEqual(b"c", bufio.read1(100))
869 self.assertEqual(rawio._reads, 1)
870 self.assertEqual(b"d", bufio.read1(100))
871 self.assertEqual(rawio._reads, 2)
872 self.assertEqual(b"efg", bufio.read1(100))
873 self.assertEqual(rawio._reads, 3)
874 self.assertEqual(b"", bufio.read1(100))
875 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 # Invalid args
877 self.assertRaises(ValueError, bufio.read1, -1)
878
879 def test_readinto(self):
880 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
881 bufio = self.tp(rawio)
882 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000883 self.assertEqual(bufio.readinto(b), 2)
884 self.assertEqual(b, b"ab")
885 self.assertEqual(bufio.readinto(b), 2)
886 self.assertEqual(b, b"cd")
887 self.assertEqual(bufio.readinto(b), 2)
888 self.assertEqual(b, b"ef")
889 self.assertEqual(bufio.readinto(b), 1)
890 self.assertEqual(b, b"gf")
891 self.assertEqual(bufio.readinto(b), 0)
892 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200893 rawio = self.MockRawIO((b"abc", None))
894 bufio = self.tp(rawio)
895 self.assertEqual(bufio.readinto(b), 2)
896 self.assertEqual(b, b"ab")
897 self.assertEqual(bufio.readinto(b), 1)
898 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000900 def test_readlines(self):
901 def bufio():
902 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
903 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000904 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
905 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
906 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000909 data = b"abcdefghi"
910 dlen = len(data)
911
912 tests = [
913 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
914 [ 100, [ 3, 3, 3], [ dlen ] ],
915 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
916 ]
917
918 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 rawio = self.MockFileIO(data)
920 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000921 pos = 0
922 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000923 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000924 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000926 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000929 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
931 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000932 self.assertEqual(b"abcd", bufio.read(6))
933 self.assertEqual(b"e", bufio.read(1))
934 self.assertEqual(b"fg", bufio.read())
935 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200936 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000937 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000938
Victor Stinnera80987f2011-05-25 22:47:16 +0200939 rawio = self.MockRawIO((b"a", None, None))
940 self.assertEqual(b"a", rawio.readall())
941 self.assertIsNone(rawio.readall())
942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000943 def test_read_past_eof(self):
944 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
945 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000946
Ezio Melottib3aedd42010-11-20 19:04:17 +0000947 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_read_all(self):
950 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
951 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000952
Ezio Melottib3aedd42010-11-20 19:04:17 +0000953 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000954
Victor Stinner45df8202010-04-28 22:31:17 +0000955 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000956 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000957 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000958 try:
959 # Write out many bytes with exactly the same number of 0's,
960 # 1's... 255's. This will help us check that concurrent reading
961 # doesn't duplicate or forget contents.
962 N = 1000
963 l = list(range(256)) * N
964 random.shuffle(l)
965 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000966 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000967 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000968 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000970 errors = []
971 results = []
972 def f():
973 try:
974 # Intra-buffer read then buffer-flushing read
975 for n in cycle([1, 19]):
976 s = bufio.read(n)
977 if not s:
978 break
979 # list.append() is atomic
980 results.append(s)
981 except Exception as e:
982 errors.append(e)
983 raise
984 threads = [threading.Thread(target=f) for x in range(20)]
985 for t in threads:
986 t.start()
987 time.sleep(0.02) # yield
988 for t in threads:
989 t.join()
990 self.assertFalse(errors,
991 "the following exceptions were caught: %r" % errors)
992 s = b''.join(results)
993 for i in range(256):
994 c = bytes(bytearray([i]))
995 self.assertEqual(s.count(c), N)
996 finally:
997 support.unlink(support.TESTFN)
998
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200999 def test_unseekable(self):
1000 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1001 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1002 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1003 bufio.read(1)
1004 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1005 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 def test_misbehaved_io(self):
1008 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1009 bufio = self.tp(rawio)
1010 self.assertRaises(IOError, bufio.seek, 0)
1011 self.assertRaises(IOError, bufio.tell)
1012
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001013 def test_no_extraneous_read(self):
1014 # Issue #9550; when the raw IO object has satisfied the read request,
1015 # we should not issue any additional reads, otherwise it may block
1016 # (e.g. socket).
1017 bufsize = 16
1018 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1019 rawio = self.MockRawIO([b"x" * n])
1020 bufio = self.tp(rawio, bufsize)
1021 self.assertEqual(bufio.read(n), b"x" * n)
1022 # Simple case: one raw read is enough to satisfy the request.
1023 self.assertEqual(rawio._extraneous_reads, 0,
1024 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1025 # A more complex case where two raw reads are needed to satisfy
1026 # the request.
1027 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1028 bufio = self.tp(rawio, bufsize)
1029 self.assertEqual(bufio.read(n), b"x" * n)
1030 self.assertEqual(rawio._extraneous_reads, 0,
1031 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1032
1033
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001034class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001035 tp = io.BufferedReader
1036
1037 def test_constructor(self):
1038 BufferedReaderTest.test_constructor(self)
1039 # The allocation can succeed on 32-bit builds, e.g. with more
1040 # than 2GB RAM and a 64-bit kernel.
1041 if sys.maxsize > 0x7FFFFFFF:
1042 rawio = self.MockRawIO()
1043 bufio = self.tp(rawio)
1044 self.assertRaises((OverflowError, MemoryError, ValueError),
1045 bufio.__init__, rawio, sys.maxsize)
1046
1047 def test_initialization(self):
1048 rawio = self.MockRawIO([b"abc"])
1049 bufio = self.tp(rawio)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1051 self.assertRaises(ValueError, bufio.read)
1052 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1053 self.assertRaises(ValueError, bufio.read)
1054 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1055 self.assertRaises(ValueError, bufio.read)
1056
1057 def test_misbehaved_io_read(self):
1058 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1059 bufio = self.tp(rawio)
1060 # _pyio.BufferedReader seems to implement reading different, so that
1061 # checking this is not so easy.
1062 self.assertRaises(IOError, bufio.read, 10)
1063
1064 def test_garbage_collection(self):
1065 # C BufferedReader objects are collected.
1066 # The Python version has __del__, so it ends into gc.garbage instead
1067 rawio = self.FileIO(support.TESTFN, "w+b")
1068 f = self.tp(rawio)
1069 f.f = f
1070 wr = weakref.ref(f)
1071 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001072 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001073 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074
1075class PyBufferedReaderTest(BufferedReaderTest):
1076 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001077
Guido van Rossuma9e20242007-03-08 00:43:48 +00001078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1080 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_constructor(self):
1083 rawio = self.MockRawIO()
1084 bufio = self.tp(rawio)
1085 bufio.__init__(rawio)
1086 bufio.__init__(rawio, buffer_size=1024)
1087 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001088 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 bufio.flush()
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1092 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1093 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001094 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001098 def test_detach_flush(self):
1099 raw = self.MockRawIO()
1100 buf = self.tp(raw)
1101 buf.write(b"howdy!")
1102 self.assertFalse(raw._write_stack)
1103 buf.detach()
1104 self.assertEqual(raw._write_stack, [b"howdy!"])
1105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001107 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 writer = self.MockRawIO()
1109 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001110 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001111 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_write_overflow(self):
1114 writer = self.MockRawIO()
1115 bufio = self.tp(writer, 8)
1116 contents = b"abcdefghijklmnop"
1117 for n in range(0, len(contents), 3):
1118 bufio.write(contents[n:n+3])
1119 flushed = b"".join(writer._write_stack)
1120 # At least (total - 8) bytes were implicitly flushed, perhaps more
1121 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001122 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def check_writes(self, intermediate_func):
1125 # Lots of writes, test the flushed output is as expected.
1126 contents = bytes(range(256)) * 1000
1127 n = 0
1128 writer = self.MockRawIO()
1129 bufio = self.tp(writer, 13)
1130 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1131 def gen_sizes():
1132 for size in count(1):
1133 for i in range(15):
1134 yield size
1135 sizes = gen_sizes()
1136 while n < len(contents):
1137 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 intermediate_func(bufio)
1140 n += size
1141 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001142 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 def test_writes(self):
1145 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147 def test_writes_and_flushes(self):
1148 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001150 def test_writes_and_seeks(self):
1151 def _seekabs(bufio):
1152 pos = bufio.tell()
1153 bufio.seek(pos + 1, 0)
1154 bufio.seek(pos - 1, 0)
1155 bufio.seek(pos, 0)
1156 self.check_writes(_seekabs)
1157 def _seekrel(bufio):
1158 pos = bufio.seek(0, 1)
1159 bufio.seek(+1, 1)
1160 bufio.seek(-1, 1)
1161 bufio.seek(pos, 0)
1162 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 def test_writes_and_truncates(self):
1165 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 def test_write_non_blocking(self):
1168 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001169 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001170
Ezio Melottib3aedd42010-11-20 19:04:17 +00001171 self.assertEqual(bufio.write(b"abcd"), 4)
1172 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 # 1 byte will be written, the rest will be buffered
1174 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001175 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001177 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1178 raw.block_on(b"0")
1179 try:
1180 bufio.write(b"opqrwxyz0123456789")
1181 except self.BlockingIOError as e:
1182 written = e.characters_written
1183 else:
1184 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001185 self.assertEqual(written, 16)
1186 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001187 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001188
Ezio Melottib3aedd42010-11-20 19:04:17 +00001189 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001190 s = raw.pop_written()
1191 # Previously buffered bytes were flushed
1192 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001194 def test_write_and_rewind(self):
1195 raw = io.BytesIO()
1196 bufio = self.tp(raw, 4)
1197 self.assertEqual(bufio.write(b"abcdef"), 6)
1198 self.assertEqual(bufio.tell(), 6)
1199 bufio.seek(0, 0)
1200 self.assertEqual(bufio.write(b"XY"), 2)
1201 bufio.seek(6, 0)
1202 self.assertEqual(raw.getvalue(), b"XYcdef")
1203 self.assertEqual(bufio.write(b"123456"), 6)
1204 bufio.flush()
1205 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 def test_flush(self):
1208 writer = self.MockRawIO()
1209 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001210 bufio.write(b"abc")
1211 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001212 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001213
Antoine Pitrou131a4892012-10-16 22:57:11 +02001214 def test_writelines(self):
1215 l = [b'ab', b'cd', b'ef']
1216 writer = self.MockRawIO()
1217 bufio = self.tp(writer, 8)
1218 bufio.writelines(l)
1219 bufio.flush()
1220 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1221
1222 def test_writelines_userlist(self):
1223 l = UserList([b'ab', b'cd', b'ef'])
1224 writer = self.MockRawIO()
1225 bufio = self.tp(writer, 8)
1226 bufio.writelines(l)
1227 bufio.flush()
1228 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1229
1230 def test_writelines_error(self):
1231 writer = self.MockRawIO()
1232 bufio = self.tp(writer, 8)
1233 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1234 self.assertRaises(TypeError, bufio.writelines, None)
1235 self.assertRaises(TypeError, bufio.writelines, 'abc')
1236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def test_destructor(self):
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 8)
1240 bufio.write(b"abc")
1241 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001242 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001243 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244
1245 def test_truncate(self):
1246 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001247 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 bufio = self.tp(raw, 8)
1249 bufio.write(b"abcdef")
1250 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001251 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001252 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 self.assertEqual(f.read(), b"abc")
1254
Victor Stinner45df8202010-04-28 22:31:17 +00001255 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001256 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001258 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 # Write out many bytes from many threads and test they were
1260 # all flushed.
1261 N = 1000
1262 contents = bytes(range(256)) * N
1263 sizes = cycle([1, 19])
1264 n = 0
1265 queue = deque()
1266 while n < len(contents):
1267 size = next(sizes)
1268 queue.append(contents[n:n+size])
1269 n += size
1270 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001271 # We use a real file object because it allows us to
1272 # exercise situations where the GIL is released before
1273 # writing the buffer to the raw streams. This is in addition
1274 # to concurrency issues due to switching threads in the middle
1275 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001276 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001278 errors = []
1279 def f():
1280 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001281 while True:
1282 try:
1283 s = queue.popleft()
1284 except IndexError:
1285 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001286 bufio.write(s)
1287 except Exception as e:
1288 errors.append(e)
1289 raise
1290 threads = [threading.Thread(target=f) for x in range(20)]
1291 for t in threads:
1292 t.start()
1293 time.sleep(0.02) # yield
1294 for t in threads:
1295 t.join()
1296 self.assertFalse(errors,
1297 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001298 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001299 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001300 s = f.read()
1301 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001302 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001303 finally:
1304 support.unlink(support.TESTFN)
1305
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001306 def test_misbehaved_io(self):
1307 rawio = self.MisbehavedRawIO()
1308 bufio = self.tp(rawio, 5)
1309 self.assertRaises(IOError, bufio.seek, 0)
1310 self.assertRaises(IOError, bufio.tell)
1311 self.assertRaises(IOError, bufio.write, b"abcdef")
1312
Florent Xicluna109d5732012-07-07 17:03:22 +02001313 def test_max_buffer_size_removal(self):
1314 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001315 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001316
Benjamin Peterson68623612012-12-20 11:53:11 -06001317 def test_write_error_on_close(self):
1318 raw = self.MockRawIO()
1319 def bad_write(b):
1320 raise IOError()
1321 raw.write = bad_write
1322 b = self.tp(raw)
1323 b.write(b'spam')
1324 self.assertRaises(IOError, b.close) # exception not swallowed
1325 self.assertTrue(b.closed)
1326
Benjamin Peterson59406a92009-03-26 17:10:29 +00001327
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001328class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329 tp = io.BufferedWriter
1330
1331 def test_constructor(self):
1332 BufferedWriterTest.test_constructor(self)
1333 # The allocation can succeed on 32-bit builds, e.g. with more
1334 # than 2GB RAM and a 64-bit kernel.
1335 if sys.maxsize > 0x7FFFFFFF:
1336 rawio = self.MockRawIO()
1337 bufio = self.tp(rawio)
1338 self.assertRaises((OverflowError, MemoryError, ValueError),
1339 bufio.__init__, rawio, sys.maxsize)
1340
1341 def test_initialization(self):
1342 rawio = self.MockRawIO()
1343 bufio = self.tp(rawio)
1344 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1345 self.assertRaises(ValueError, bufio.write, b"def")
1346 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1347 self.assertRaises(ValueError, bufio.write, b"def")
1348 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1349 self.assertRaises(ValueError, bufio.write, b"def")
1350
1351 def test_garbage_collection(self):
1352 # C BufferedWriter objects are collected, and collecting them flushes
1353 # all data to disk.
1354 # The Python version has __del__, so it ends into gc.garbage instead
1355 rawio = self.FileIO(support.TESTFN, "w+b")
1356 f = self.tp(rawio)
1357 f.write(b"123xxx")
1358 f.x = f
1359 wr = weakref.ref(f)
1360 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001361 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001362 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001363 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001364 self.assertEqual(f.read(), b"123xxx")
1365
1366
1367class PyBufferedWriterTest(BufferedWriterTest):
1368 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001369
Guido van Rossum01a27522007-03-07 01:00:12 +00001370class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001371
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001372 def test_constructor(self):
1373 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001374 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001375
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001376 def test_detach(self):
1377 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1378 self.assertRaises(self.UnsupportedOperation, pair.detach)
1379
Florent Xicluna109d5732012-07-07 17:03:22 +02001380 def test_constructor_max_buffer_size_removal(self):
1381 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001382 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001383
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001384 def test_constructor_with_not_readable(self):
1385 class NotReadable(MockRawIO):
1386 def readable(self):
1387 return False
1388
1389 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1390
1391 def test_constructor_with_not_writeable(self):
1392 class NotWriteable(MockRawIO):
1393 def writable(self):
1394 return False
1395
1396 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1397
1398 def test_read(self):
1399 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1400
1401 self.assertEqual(pair.read(3), b"abc")
1402 self.assertEqual(pair.read(1), b"d")
1403 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001404 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1405 self.assertEqual(pair.read(None), b"abc")
1406
1407 def test_readlines(self):
1408 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1409 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1410 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1411 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001412
1413 def test_read1(self):
1414 # .read1() is delegated to the underlying reader object, so this test
1415 # can be shallow.
1416 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1417
1418 self.assertEqual(pair.read1(3), b"abc")
1419
1420 def test_readinto(self):
1421 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1422
1423 data = bytearray(5)
1424 self.assertEqual(pair.readinto(data), 5)
1425 self.assertEqual(data, b"abcde")
1426
1427 def test_write(self):
1428 w = self.MockRawIO()
1429 pair = self.tp(self.MockRawIO(), w)
1430
1431 pair.write(b"abc")
1432 pair.flush()
1433 pair.write(b"def")
1434 pair.flush()
1435 self.assertEqual(w._write_stack, [b"abc", b"def"])
1436
1437 def test_peek(self):
1438 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1439
1440 self.assertTrue(pair.peek(3).startswith(b"abc"))
1441 self.assertEqual(pair.read(3), b"abc")
1442
1443 def test_readable(self):
1444 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1445 self.assertTrue(pair.readable())
1446
1447 def test_writeable(self):
1448 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1449 self.assertTrue(pair.writable())
1450
1451 def test_seekable(self):
1452 # BufferedRWPairs are never seekable, even if their readers and writers
1453 # are.
1454 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1455 self.assertFalse(pair.seekable())
1456
1457 # .flush() is delegated to the underlying writer object and has been
1458 # tested in the test_write method.
1459
1460 def test_close_and_closed(self):
1461 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1462 self.assertFalse(pair.closed)
1463 pair.close()
1464 self.assertTrue(pair.closed)
1465
1466 def test_isatty(self):
1467 class SelectableIsAtty(MockRawIO):
1468 def __init__(self, isatty):
1469 MockRawIO.__init__(self)
1470 self._isatty = isatty
1471
1472 def isatty(self):
1473 return self._isatty
1474
1475 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1476 self.assertFalse(pair.isatty())
1477
1478 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1479 self.assertTrue(pair.isatty())
1480
1481 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1482 self.assertTrue(pair.isatty())
1483
1484 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1485 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487class CBufferedRWPairTest(BufferedRWPairTest):
1488 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001489
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001490class PyBufferedRWPairTest(BufferedRWPairTest):
1491 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001492
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001493
1494class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1495 read_mode = "rb+"
1496 write_mode = "wb+"
1497
1498 def test_constructor(self):
1499 BufferedReaderTest.test_constructor(self)
1500 BufferedWriterTest.test_constructor(self)
1501
1502 def test_read_and_write(self):
1503 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001504 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001505
1506 self.assertEqual(b"as", rw.read(2))
1507 rw.write(b"ddd")
1508 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001509 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001511 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001513 def test_seek_and_tell(self):
1514 raw = self.BytesIO(b"asdfghjkl")
1515 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001516
Ezio Melottib3aedd42010-11-20 19:04:17 +00001517 self.assertEqual(b"as", rw.read(2))
1518 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001519 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001520 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001521
Antoine Pitroue05565e2011-08-20 14:39:23 +02001522 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001523 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001524 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001525 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001526 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001527 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001528 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001529 self.assertEqual(7, rw.tell())
1530 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001531 rw.flush()
1532 self.assertEqual(b"asdf123fl", raw.getvalue())
1533
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001534 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 def check_flush_and_read(self, read_func):
1537 raw = self.BytesIO(b"abcdefghi")
1538 bufio = self.tp(raw)
1539
Ezio Melottib3aedd42010-11-20 19:04:17 +00001540 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001542 self.assertEqual(b"ef", read_func(bufio, 2))
1543 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001545 self.assertEqual(6, bufio.tell())
1546 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 raw.seek(0, 0)
1548 raw.write(b"XYZ")
1549 # flush() resets the read buffer
1550 bufio.flush()
1551 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001553
1554 def test_flush_and_read(self):
1555 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1556
1557 def test_flush_and_readinto(self):
1558 def _readinto(bufio, n=-1):
1559 b = bytearray(n if n >= 0 else 9999)
1560 n = bufio.readinto(b)
1561 return bytes(b[:n])
1562 self.check_flush_and_read(_readinto)
1563
1564 def test_flush_and_peek(self):
1565 def _peek(bufio, n=-1):
1566 # This relies on the fact that the buffer can contain the whole
1567 # raw stream, otherwise peek() can return less.
1568 b = bufio.peek(n)
1569 if n != -1:
1570 b = b[:n]
1571 bufio.seek(len(b), 1)
1572 return b
1573 self.check_flush_and_read(_peek)
1574
1575 def test_flush_and_write(self):
1576 raw = self.BytesIO(b"abcdefghi")
1577 bufio = self.tp(raw)
1578
1579 bufio.write(b"123")
1580 bufio.flush()
1581 bufio.write(b"45")
1582 bufio.flush()
1583 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(b"12345fghi", raw.getvalue())
1585 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001586
1587 def test_threads(self):
1588 BufferedReaderTest.test_threads(self)
1589 BufferedWriterTest.test_threads(self)
1590
1591 def test_writes_and_peek(self):
1592 def _peek(bufio):
1593 bufio.peek(1)
1594 self.check_writes(_peek)
1595 def _peek(bufio):
1596 pos = bufio.tell()
1597 bufio.seek(-1, 1)
1598 bufio.peek(1)
1599 bufio.seek(pos, 0)
1600 self.check_writes(_peek)
1601
1602 def test_writes_and_reads(self):
1603 def _read(bufio):
1604 bufio.seek(-1, 1)
1605 bufio.read(1)
1606 self.check_writes(_read)
1607
1608 def test_writes_and_read1s(self):
1609 def _read1(bufio):
1610 bufio.seek(-1, 1)
1611 bufio.read1(1)
1612 self.check_writes(_read1)
1613
1614 def test_writes_and_readintos(self):
1615 def _read(bufio):
1616 bufio.seek(-1, 1)
1617 bufio.readinto(bytearray(1))
1618 self.check_writes(_read)
1619
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001620 def test_write_after_readahead(self):
1621 # Issue #6629: writing after the buffer was filled by readahead should
1622 # first rewind the raw stream.
1623 for overwrite_size in [1, 5]:
1624 raw = self.BytesIO(b"A" * 10)
1625 bufio = self.tp(raw, 4)
1626 # Trigger readahead
1627 self.assertEqual(bufio.read(1), b"A")
1628 self.assertEqual(bufio.tell(), 1)
1629 # Overwriting should rewind the raw stream if it needs so
1630 bufio.write(b"B" * overwrite_size)
1631 self.assertEqual(bufio.tell(), overwrite_size + 1)
1632 # If the write size was smaller than the buffer size, flush() and
1633 # check that rewind happens.
1634 bufio.flush()
1635 self.assertEqual(bufio.tell(), overwrite_size + 1)
1636 s = raw.getvalue()
1637 self.assertEqual(s,
1638 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1639
Antoine Pitrou7c404892011-05-13 00:13:33 +02001640 def test_write_rewind_write(self):
1641 # Various combinations of reading / writing / seeking backwards / writing again
1642 def mutate(bufio, pos1, pos2):
1643 assert pos2 >= pos1
1644 # Fill the buffer
1645 bufio.seek(pos1)
1646 bufio.read(pos2 - pos1)
1647 bufio.write(b'\x02')
1648 # This writes earlier than the previous write, but still inside
1649 # the buffer.
1650 bufio.seek(pos1)
1651 bufio.write(b'\x01')
1652
1653 b = b"\x80\x81\x82\x83\x84"
1654 for i in range(0, len(b)):
1655 for j in range(i, len(b)):
1656 raw = self.BytesIO(b)
1657 bufio = self.tp(raw, 100)
1658 mutate(bufio, i, j)
1659 bufio.flush()
1660 expected = bytearray(b)
1661 expected[j] = 2
1662 expected[i] = 1
1663 self.assertEqual(raw.getvalue(), expected,
1664 "failed result for i=%d, j=%d" % (i, j))
1665
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001666 def test_truncate_after_read_or_write(self):
1667 raw = self.BytesIO(b"A" * 10)
1668 bufio = self.tp(raw, 100)
1669 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1670 self.assertEqual(bufio.truncate(), 2)
1671 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1672 self.assertEqual(bufio.truncate(), 4)
1673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def test_misbehaved_io(self):
1675 BufferedReaderTest.test_misbehaved_io(self)
1676 BufferedWriterTest.test_misbehaved_io(self)
1677
Antoine Pitroue05565e2011-08-20 14:39:23 +02001678 def test_interleaved_read_write(self):
1679 # Test for issue #12213
1680 with self.BytesIO(b'abcdefgh') as raw:
1681 with self.tp(raw, 100) as f:
1682 f.write(b"1")
1683 self.assertEqual(f.read(1), b'b')
1684 f.write(b'2')
1685 self.assertEqual(f.read1(1), b'd')
1686 f.write(b'3')
1687 buf = bytearray(1)
1688 f.readinto(buf)
1689 self.assertEqual(buf, b'f')
1690 f.write(b'4')
1691 self.assertEqual(f.peek(1), b'h')
1692 f.flush()
1693 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1694
1695 with self.BytesIO(b'abc') as raw:
1696 with self.tp(raw, 100) as f:
1697 self.assertEqual(f.read(1), b'a')
1698 f.write(b"2")
1699 self.assertEqual(f.read(1), b'c')
1700 f.flush()
1701 self.assertEqual(raw.getvalue(), b'a2c')
1702
1703 def test_interleaved_readline_write(self):
1704 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1705 with self.tp(raw) as f:
1706 f.write(b'1')
1707 self.assertEqual(f.readline(), b'b\n')
1708 f.write(b'2')
1709 self.assertEqual(f.readline(), b'def\n')
1710 f.write(b'3')
1711 self.assertEqual(f.readline(), b'\n')
1712 f.flush()
1713 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1714
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001715 # You can't construct a BufferedRandom over a non-seekable stream.
1716 test_unseekable = None
1717
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001718class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 tp = io.BufferedRandom
1720
1721 def test_constructor(self):
1722 BufferedRandomTest.test_constructor(self)
1723 # The allocation can succeed on 32-bit builds, e.g. with more
1724 # than 2GB RAM and a 64-bit kernel.
1725 if sys.maxsize > 0x7FFFFFFF:
1726 rawio = self.MockRawIO()
1727 bufio = self.tp(rawio)
1728 self.assertRaises((OverflowError, MemoryError, ValueError),
1729 bufio.__init__, rawio, sys.maxsize)
1730
1731 def test_garbage_collection(self):
1732 CBufferedReaderTest.test_garbage_collection(self)
1733 CBufferedWriterTest.test_garbage_collection(self)
1734
1735class PyBufferedRandomTest(BufferedRandomTest):
1736 tp = pyio.BufferedRandom
1737
1738
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001739# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1740# properties:
1741# - A single output character can correspond to many bytes of input.
1742# - The number of input bytes to complete the character can be
1743# undetermined until the last input byte is received.
1744# - The number of input bytes can vary depending on previous input.
1745# - A single input byte can correspond to many characters of output.
1746# - The number of output characters can be undetermined until the
1747# last input byte is received.
1748# - The number of output characters can vary depending on previous input.
1749
1750class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1751 """
1752 For testing seek/tell behavior with a stateful, buffering decoder.
1753
1754 Input is a sequence of words. Words may be fixed-length (length set
1755 by input) or variable-length (period-terminated). In variable-length
1756 mode, extra periods are ignored. Possible words are:
1757 - 'i' followed by a number sets the input length, I (maximum 99).
1758 When I is set to 0, words are space-terminated.
1759 - 'o' followed by a number sets the output length, O (maximum 99).
1760 - Any other word is converted into a word followed by a period on
1761 the output. The output word consists of the input word truncated
1762 or padded out with hyphens to make its length equal to O. If O
1763 is 0, the word is output verbatim without truncating or padding.
1764 I and O are initially set to 1. When I changes, any buffered input is
1765 re-scanned according to the new I. EOF also terminates the last word.
1766 """
1767
1768 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001769 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001770 self.reset()
1771
1772 def __repr__(self):
1773 return '<SID %x>' % id(self)
1774
1775 def reset(self):
1776 self.i = 1
1777 self.o = 1
1778 self.buffer = bytearray()
1779
1780 def getstate(self):
1781 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1782 return bytes(self.buffer), i*100 + o
1783
1784 def setstate(self, state):
1785 buffer, io = state
1786 self.buffer = bytearray(buffer)
1787 i, o = divmod(io, 100)
1788 self.i, self.o = i ^ 1, o ^ 1
1789
1790 def decode(self, input, final=False):
1791 output = ''
1792 for b in input:
1793 if self.i == 0: # variable-length, terminated with period
1794 if b == ord('.'):
1795 if self.buffer:
1796 output += self.process_word()
1797 else:
1798 self.buffer.append(b)
1799 else: # fixed-length, terminate after self.i bytes
1800 self.buffer.append(b)
1801 if len(self.buffer) == self.i:
1802 output += self.process_word()
1803 if final and self.buffer: # EOF terminates the last word
1804 output += self.process_word()
1805 return output
1806
1807 def process_word(self):
1808 output = ''
1809 if self.buffer[0] == ord('i'):
1810 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1811 elif self.buffer[0] == ord('o'):
1812 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1813 else:
1814 output = self.buffer.decode('ascii')
1815 if len(output) < self.o:
1816 output += '-'*self.o # pad out with hyphens
1817 if self.o:
1818 output = output[:self.o] # truncate to output length
1819 output += '.'
1820 self.buffer = bytearray()
1821 return output
1822
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001823 codecEnabled = False
1824
1825 @classmethod
1826 def lookupTestDecoder(cls, name):
1827 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001828 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001829 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001830 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001831 incrementalencoder=None,
1832 streamreader=None, streamwriter=None,
1833 incrementaldecoder=cls)
1834
1835# Register the previous decoder for testing.
1836# Disabled by default, tests will enable it.
1837codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1838
1839
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001840class StatefulIncrementalDecoderTest(unittest.TestCase):
1841 """
1842 Make sure the StatefulIncrementalDecoder actually works.
1843 """
1844
1845 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001846 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001847 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001848 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001849 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001850 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001851 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001852 # I=0, O=6 (variable-length input, fixed-length output)
1853 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1854 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001855 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001856 # I=6, O=3 (fixed-length input > fixed-length output)
1857 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1858 # I=0, then 3; O=29, then 15 (with longer output)
1859 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1860 'a----------------------------.' +
1861 'b----------------------------.' +
1862 'cde--------------------------.' +
1863 'abcdefghijabcde.' +
1864 'a.b------------.' +
1865 '.c.------------.' +
1866 'd.e------------.' +
1867 'k--------------.' +
1868 'l--------------.' +
1869 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001870 ]
1871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001873 # Try a few one-shot test cases.
1874 for input, eof, output in self.test_cases:
1875 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001876 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001877
1878 # Also test an unfinished decode, followed by forcing EOF.
1879 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001880 self.assertEqual(d.decode(b'oiabcd'), '')
1881 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001882
1883class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001884
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001885 def setUp(self):
1886 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1887 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001888 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001889
Guido van Rossumd0712812007-04-11 16:32:43 +00001890 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001891 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 def test_constructor(self):
1894 r = self.BytesIO(b"\xc3\xa9\n\n")
1895 b = self.BufferedReader(r, 1000)
1896 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001897 t.__init__(b, encoding="latin-1", newline="\r\n")
1898 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001899 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001900 t.__init__(b, encoding="utf-8", line_buffering=True)
1901 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001902 self.assertEqual(t.line_buffering, True)
1903 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 self.assertRaises(TypeError, t.__init__, b, newline=42)
1905 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1906
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001907 def test_detach(self):
1908 r = self.BytesIO()
1909 b = self.BufferedWriter(r)
1910 t = self.TextIOWrapper(b)
1911 self.assertIs(t.detach(), b)
1912
1913 t = self.TextIOWrapper(b, encoding="ascii")
1914 t.write("howdy")
1915 self.assertFalse(r.getvalue())
1916 t.detach()
1917 self.assertEqual(r.getvalue(), b"howdy")
1918 self.assertRaises(ValueError, t.detach)
1919
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001920 def test_repr(self):
1921 raw = self.BytesIO("hello".encode("utf-8"))
1922 b = self.BufferedReader(raw)
1923 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001924 modname = self.TextIOWrapper.__module__
1925 self.assertEqual(repr(t),
1926 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1927 raw.name = "dummy"
1928 self.assertEqual(repr(t),
1929 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001930 t.mode = "r"
1931 self.assertEqual(repr(t),
1932 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001933 raw.name = b"dummy"
1934 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001935 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937 def test_line_buffering(self):
1938 r = self.BytesIO()
1939 b = self.BufferedWriter(r, 1000)
1940 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001941 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001942 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001943 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001944 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001945 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001946 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001947
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001948 def test_default_encoding(self):
1949 old_environ = dict(os.environ)
1950 try:
1951 # try to get a user preferred encoding different than the current
1952 # locale encoding to check that TextIOWrapper() uses the current
1953 # locale encoding and not the user preferred encoding
1954 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1955 if key in os.environ:
1956 del os.environ[key]
1957
1958 current_locale_encoding = locale.getpreferredencoding(False)
1959 b = self.BytesIO()
1960 t = self.TextIOWrapper(b)
1961 self.assertEqual(t.encoding, current_locale_encoding)
1962 finally:
1963 os.environ.clear()
1964 os.environ.update(old_environ)
1965
Serhiy Storchaka441d30f2013-01-19 12:26:26 +02001966 # Issue 15989
1967 def test_device_encoding(self):
1968 b = self.BytesIO()
1969 b.fileno = lambda: _testcapi.INT_MAX + 1
1970 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1971 b.fileno = lambda: _testcapi.UINT_MAX + 1
1972 self.assertRaises(OverflowError, self.TextIOWrapper, b)
1973
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 def test_encoding(self):
1975 # Check the encoding attribute is always set, and valid
1976 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001977 t = self.TextIOWrapper(b, encoding="utf-8")
1978 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001979 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001980 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001981 codecs.lookup(t.encoding)
1982
1983 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001984 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985 b = self.BytesIO(b"abc\n\xff\n")
1986 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001987 self.assertRaises(UnicodeError, t.read)
1988 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 b = self.BytesIO(b"abc\n\xff\n")
1990 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001991 self.assertRaises(UnicodeError, t.read)
1992 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001993 b = self.BytesIO(b"abc\n\xff\n")
1994 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001995 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001996 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 b = self.BytesIO(b"abc\n\xff\n")
1998 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001999 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002002 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 b = self.BytesIO()
2004 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002005 self.assertRaises(UnicodeError, t.write, "\xff")
2006 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002007 b = self.BytesIO()
2008 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002009 self.assertRaises(UnicodeError, t.write, "\xff")
2010 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 b = self.BytesIO()
2012 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002013 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002014 t.write("abc\xffdef\n")
2015 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002017 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002018 b = self.BytesIO()
2019 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002020 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002021 t.write("abc\xffdef\n")
2022 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002023 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002024
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002026 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2027
2028 tests = [
2029 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002030 [ '', input_lines ],
2031 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2032 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2033 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002034 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002035 encodings = (
2036 'utf-8', 'latin-1',
2037 'utf-16', 'utf-16-le', 'utf-16-be',
2038 'utf-32', 'utf-32-le', 'utf-32-be',
2039 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002040
Guido van Rossum8358db22007-08-18 21:39:55 +00002041 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002042 # character in TextIOWrapper._pending_line.
2043 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002044 # XXX: str.encode() should return bytes
2045 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002046 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002047 for bufsize in range(1, 10):
2048 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002049 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2050 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002051 encoding=encoding)
2052 if do_reads:
2053 got_lines = []
2054 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002055 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002056 if c2 == '':
2057 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002058 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002059 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002060 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002061 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002062
2063 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002064 self.assertEqual(got_line, exp_line)
2065 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 def test_newlines_input(self):
2068 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002069 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2070 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002071 (None, normalized.decode("ascii").splitlines(keepends=True)),
2072 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2074 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2075 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002076 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002077 buf = self.BytesIO(testdata)
2078 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002079 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002080 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002082
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 def test_newlines_output(self):
2084 testdict = {
2085 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2086 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2087 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2088 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2089 }
2090 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2091 for newline, expected in tests:
2092 buf = self.BytesIO()
2093 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2094 txt.write("AAA\nB")
2095 txt.write("BB\nCCC\n")
2096 txt.write("X\rY\r\nZ")
2097 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(buf.closed, False)
2099 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100
2101 def test_destructor(self):
2102 l = []
2103 base = self.BytesIO
2104 class MyBytesIO(base):
2105 def close(self):
2106 l.append(self.getvalue())
2107 base.close(self)
2108 b = MyBytesIO()
2109 t = self.TextIOWrapper(b, encoding="ascii")
2110 t.write("abc")
2111 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002112 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002114
2115 def test_override_destructor(self):
2116 record = []
2117 class MyTextIO(self.TextIOWrapper):
2118 def __del__(self):
2119 record.append(1)
2120 try:
2121 f = super().__del__
2122 except AttributeError:
2123 pass
2124 else:
2125 f()
2126 def close(self):
2127 record.append(2)
2128 super().close()
2129 def flush(self):
2130 record.append(3)
2131 super().flush()
2132 b = self.BytesIO()
2133 t = MyTextIO(b, encoding="ascii")
2134 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002135 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136 self.assertEqual(record, [1, 2, 3])
2137
2138 def test_error_through_destructor(self):
2139 # Test that the exception state is not modified by a destructor,
2140 # even if close() fails.
2141 rawio = self.CloseFailureIO()
2142 def f():
2143 self.TextIOWrapper(rawio).xyzzy
2144 with support.captured_output("stderr") as s:
2145 self.assertRaises(AttributeError, f)
2146 s = s.getvalue().strip()
2147 if s:
2148 # The destructor *may* have printed an unraisable error, check it
2149 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002150 self.assertTrue(s.startswith("Exception IOError: "), s)
2151 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002152
Guido van Rossum9b76da62007-04-11 01:09:03 +00002153 # Systematic tests of the text I/O API
2154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002156 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002157 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002158 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002159 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002160 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002161 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002163 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002164 self.assertEqual(f.tell(), 0)
2165 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002166 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(f.seek(0), 0)
2168 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002169 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(f.read(2), "ab")
2171 self.assertEqual(f.read(1), "c")
2172 self.assertEqual(f.read(1), "")
2173 self.assertEqual(f.read(), "")
2174 self.assertEqual(f.tell(), cookie)
2175 self.assertEqual(f.seek(0), 0)
2176 self.assertEqual(f.seek(0, 2), cookie)
2177 self.assertEqual(f.write("def"), 3)
2178 self.assertEqual(f.seek(cookie), cookie)
2179 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002180 if enc.startswith("utf"):
2181 self.multi_line_test(f, enc)
2182 f.close()
2183
2184 def multi_line_test(self, f, enc):
2185 f.seek(0)
2186 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002187 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002188 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002189 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002190 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002191 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002192 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002193 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002194 wlines.append((f.tell(), line))
2195 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002196 f.seek(0)
2197 rlines = []
2198 while True:
2199 pos = f.tell()
2200 line = f.readline()
2201 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002202 break
2203 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002207 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002208 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002209 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002210 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002211 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002212 p2 = f.tell()
2213 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002214 self.assertEqual(f.tell(), p0)
2215 self.assertEqual(f.readline(), "\xff\n")
2216 self.assertEqual(f.tell(), p1)
2217 self.assertEqual(f.readline(), "\xff\n")
2218 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002219 f.seek(0)
2220 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002222 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002224 f.close()
2225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 def test_seeking(self):
2227 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002228 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002229 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002230 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002232 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002233 suffix = bytes(u_suffix.encode("utf-8"))
2234 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002235 with self.open(support.TESTFN, "wb") as f:
2236 f.write(line*2)
2237 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2238 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(s, str(prefix, "ascii"))
2240 self.assertEqual(f.tell(), prefix_size)
2241 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002244 # Regression test for a specific bug
2245 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002246 with self.open(support.TESTFN, "wb") as f:
2247 f.write(data)
2248 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2249 f._CHUNK_SIZE # Just test that it exists
2250 f._CHUNK_SIZE = 2
2251 f.readline()
2252 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002254 def test_seek_and_tell(self):
2255 #Test seek/tell using the StatefulIncrementalDecoder.
2256 # Make test faster by doing smaller seeks
2257 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002258
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002259 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002260 """Tell/seek to various points within a data stream and ensure
2261 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002263 f.write(data)
2264 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002265 f = self.open(support.TESTFN, encoding='test_decoder')
2266 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002267 decoded = f.read()
2268 f.close()
2269
Neal Norwitze2b07052008-03-18 19:52:05 +00002270 for i in range(min_pos, len(decoded) + 1): # seek positions
2271 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002273 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002274 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002276 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002277 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002278 f.close()
2279
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002280 # Enable the test decoder.
2281 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002282
2283 # Run the tests.
2284 try:
2285 # Try each test case.
2286 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002287 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002288
2289 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002290 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2291 offset = CHUNK_SIZE - len(input)//2
2292 prefix = b'.'*offset
2293 # Don't bother seeking into the prefix (takes too long).
2294 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002295 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002296
2297 # Ensure our test decoder won't interfere with subsequent tests.
2298 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002299 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002300
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002302 data = "1234567890"
2303 tests = ("utf-16",
2304 "utf-16-le",
2305 "utf-16-be",
2306 "utf-32",
2307 "utf-32-le",
2308 "utf-32-be")
2309 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002310 buf = self.BytesIO()
2311 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002312 # Check if the BOM is written only once (see issue1753).
2313 f.write(data)
2314 f.write(data)
2315 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002316 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002317 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002318 self.assertEqual(f.read(), data * 2)
2319 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002320
Benjamin Petersona1b49012009-03-31 23:11:32 +00002321 def test_unreadable(self):
2322 class UnReadable(self.BytesIO):
2323 def readable(self):
2324 return False
2325 txt = self.TextIOWrapper(UnReadable())
2326 self.assertRaises(IOError, txt.read)
2327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 def test_read_one_by_one(self):
2329 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002330 reads = ""
2331 while True:
2332 c = txt.read(1)
2333 if not c:
2334 break
2335 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002337
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002338 def test_readlines(self):
2339 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2340 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2341 txt.seek(0)
2342 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2343 txt.seek(0)
2344 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2345
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002346 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002348 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002349 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002350 reads = ""
2351 while True:
2352 c = txt.read(128)
2353 if not c:
2354 break
2355 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002356 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002357
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002358 def test_writelines(self):
2359 l = ['ab', 'cd', 'ef']
2360 buf = self.BytesIO()
2361 txt = self.TextIOWrapper(buf)
2362 txt.writelines(l)
2363 txt.flush()
2364 self.assertEqual(buf.getvalue(), b'abcdef')
2365
2366 def test_writelines_userlist(self):
2367 l = UserList(['ab', 'cd', 'ef'])
2368 buf = self.BytesIO()
2369 txt = self.TextIOWrapper(buf)
2370 txt.writelines(l)
2371 txt.flush()
2372 self.assertEqual(buf.getvalue(), b'abcdef')
2373
2374 def test_writelines_error(self):
2375 txt = self.TextIOWrapper(self.BytesIO())
2376 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2377 self.assertRaises(TypeError, txt.writelines, None)
2378 self.assertRaises(TypeError, txt.writelines, b'abc')
2379
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002380 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002382
2383 # read one char at a time
2384 reads = ""
2385 while True:
2386 c = txt.read(1)
2387 if not c:
2388 break
2389 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002391
2392 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002393 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002394 txt._CHUNK_SIZE = 4
2395
2396 reads = ""
2397 while True:
2398 c = txt.read(4)
2399 if not c:
2400 break
2401 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002402 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002403
2404 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002405 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002406 txt._CHUNK_SIZE = 4
2407
2408 reads = txt.read(4)
2409 reads += txt.read(4)
2410 reads += txt.readline()
2411 reads += txt.readline()
2412 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002413 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002414
2415 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002417 txt._CHUNK_SIZE = 4
2418
2419 reads = txt.read(4)
2420 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002421 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002422
2423 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002424 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002425 txt._CHUNK_SIZE = 4
2426
2427 reads = txt.read(4)
2428 pos = txt.tell()
2429 txt.seek(0)
2430 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002431 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002432
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002433 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002434 buffer = self.BytesIO(self.testdata)
2435 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002436
2437 self.assertEqual(buffer.seekable(), txt.seekable())
2438
Antoine Pitroue4501852009-05-14 18:55:55 +00002439 def test_append_bom(self):
2440 # The BOM is not written again when appending to a non-empty file
2441 filename = support.TESTFN
2442 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2443 with self.open(filename, 'w', encoding=charset) as f:
2444 f.write('aaa')
2445 pos = f.tell()
2446 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002447 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002448
2449 with self.open(filename, 'a', encoding=charset) as f:
2450 f.write('xxx')
2451 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002452 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002453
2454 def test_seek_bom(self):
2455 # Same test, but when seeking manually
2456 filename = support.TESTFN
2457 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2458 with self.open(filename, 'w', encoding=charset) as f:
2459 f.write('aaa')
2460 pos = f.tell()
2461 with self.open(filename, 'r+', encoding=charset) as f:
2462 f.seek(pos)
2463 f.write('zzz')
2464 f.seek(0)
2465 f.write('bbb')
2466 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002468
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002469 def test_errors_property(self):
2470 with self.open(support.TESTFN, "w") as f:
2471 self.assertEqual(f.errors, "strict")
2472 with self.open(support.TESTFN, "w", errors="replace") as f:
2473 self.assertEqual(f.errors, "replace")
2474
Brett Cannon31f59292011-02-21 19:29:56 +00002475 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002476 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002477 def test_threads_write(self):
2478 # Issue6750: concurrent writes could duplicate data
2479 event = threading.Event()
2480 with self.open(support.TESTFN, "w", buffering=1) as f:
2481 def run(n):
2482 text = "Thread%03d\n" % n
2483 event.wait()
2484 f.write(text)
2485 threads = [threading.Thread(target=lambda n=x: run(n))
2486 for x in range(20)]
2487 for t in threads:
2488 t.start()
2489 time.sleep(0.02)
2490 event.set()
2491 for t in threads:
2492 t.join()
2493 with self.open(support.TESTFN) as f:
2494 content = f.read()
2495 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002496 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002497
Antoine Pitrou6be88762010-05-03 16:48:20 +00002498 def test_flush_error_on_close(self):
2499 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2500 def bad_flush():
2501 raise IOError()
2502 txt.flush = bad_flush
2503 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002504 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002505
2506 def test_multi_close(self):
2507 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2508 txt.close()
2509 txt.close()
2510 txt.close()
2511 self.assertRaises(ValueError, txt.flush)
2512
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002513 def test_unseekable(self):
2514 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2515 self.assertRaises(self.UnsupportedOperation, txt.tell)
2516 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2517
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002518 def test_readonly_attributes(self):
2519 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2520 buf = self.BytesIO(self.testdata)
2521 with self.assertRaises(AttributeError):
2522 txt.buffer = buf
2523
Antoine Pitroue96ec682011-07-23 21:46:35 +02002524 def test_rawio(self):
2525 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2526 # that subprocess.Popen() can have the required unbuffered
2527 # semantics with universal_newlines=True.
2528 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2529 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2530 # Reads
2531 self.assertEqual(txt.read(4), 'abcd')
2532 self.assertEqual(txt.readline(), 'efghi\n')
2533 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2534
2535 def test_rawio_write_through(self):
2536 # Issue #12591: with write_through=True, writes don't need a flush
2537 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2538 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2539 write_through=True)
2540 txt.write('1')
2541 txt.write('23\n4')
2542 txt.write('5')
2543 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2544
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02002545 def test_read_nonbytes(self):
2546 # Issue #17106
2547 # Crash when underlying read() returns non-bytes
2548 t = self.TextIOWrapper(self.StringIO('a'))
2549 self.assertRaises(TypeError, t.read, 1)
2550 t = self.TextIOWrapper(self.StringIO('a'))
2551 self.assertRaises(TypeError, t.readline)
2552 t = self.TextIOWrapper(self.StringIO('a'))
2553 self.assertRaises(TypeError, t.read)
2554
2555 def test_illegal_decoder(self):
2556 # Issue #17106
2557 # Crash when decoder returns non-string
2558 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2559 encoding='quopri_codec')
2560 self.assertRaises(TypeError, t.read, 1)
2561 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2562 encoding='quopri_codec')
2563 self.assertRaises(TypeError, t.readline)
2564 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2565 encoding='quopri_codec')
2566 self.assertRaises(TypeError, t.read)
2567
2568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569class CTextIOWrapperTest(TextIOWrapperTest):
2570
2571 def test_initialization(self):
2572 r = self.BytesIO(b"\xc3\xa9\n\n")
2573 b = self.BufferedReader(r, 1000)
2574 t = self.TextIOWrapper(b)
2575 self.assertRaises(TypeError, t.__init__, b, newline=42)
2576 self.assertRaises(ValueError, t.read)
2577 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2578 self.assertRaises(ValueError, t.read)
2579
2580 def test_garbage_collection(self):
2581 # C TextIOWrapper objects are collected, and collecting them flushes
2582 # all data to disk.
2583 # The Python version has __del__, so it ends in gc.garbage instead.
2584 rawio = io.FileIO(support.TESTFN, "wb")
2585 b = self.BufferedWriter(rawio)
2586 t = self.TextIOWrapper(b, encoding="ascii")
2587 t.write("456def")
2588 t.x = t
2589 wr = weakref.ref(t)
2590 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002591 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002592 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002593 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002594 self.assertEqual(f.read(), b"456def")
2595
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002596 def test_rwpair_cleared_before_textio(self):
2597 # Issue 13070: TextIOWrapper's finalization would crash when called
2598 # after the reference to the underlying BufferedRWPair's writer got
2599 # cleared by the GC.
2600 for i in range(1000):
2601 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2602 t1 = self.TextIOWrapper(b1, encoding="ascii")
2603 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2604 t2 = self.TextIOWrapper(b2, encoding="ascii")
2605 # circular references
2606 t1.buddy = t2
2607 t2.buddy = t1
2608 support.gc_collect()
2609
2610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002611class PyTextIOWrapperTest(TextIOWrapperTest):
2612 pass
2613
2614
2615class IncrementalNewlineDecoderTest(unittest.TestCase):
2616
2617 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002618 # UTF-8 specific tests for a newline decoder
2619 def _check_decode(b, s, **kwargs):
2620 # We exercise getstate() / setstate() as well as decode()
2621 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002622 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002623 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002624 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002625
Antoine Pitrou180a3362008-12-14 16:36:46 +00002626 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002627
Antoine Pitrou180a3362008-12-14 16:36:46 +00002628 _check_decode(b'\xe8', "")
2629 _check_decode(b'\xa2', "")
2630 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002631
Antoine Pitrou180a3362008-12-14 16:36:46 +00002632 _check_decode(b'\xe8', "")
2633 _check_decode(b'\xa2', "")
2634 _check_decode(b'\x88', "\u8888")
2635
2636 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002637 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2638
Antoine Pitrou180a3362008-12-14 16:36:46 +00002639 decoder.reset()
2640 _check_decode(b'\n', "\n")
2641 _check_decode(b'\r', "")
2642 _check_decode(b'', "\n", final=True)
2643 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002644
Antoine Pitrou180a3362008-12-14 16:36:46 +00002645 _check_decode(b'\r', "")
2646 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002647
Antoine Pitrou180a3362008-12-14 16:36:46 +00002648 _check_decode(b'\r\r\n', "\n\n")
2649 _check_decode(b'\r', "")
2650 _check_decode(b'\r', "\n")
2651 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002652
Antoine Pitrou180a3362008-12-14 16:36:46 +00002653 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2654 _check_decode(b'\xe8\xa2\x88', "\u8888")
2655 _check_decode(b'\n', "\n")
2656 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2657 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002658
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002659 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002660 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 if encoding is not None:
2662 encoder = codecs.getincrementalencoder(encoding)()
2663 def _decode_bytewise(s):
2664 # Decode one byte at a time
2665 for b in encoder.encode(s):
2666 result.append(decoder.decode(bytes([b])))
2667 else:
2668 encoder = None
2669 def _decode_bytewise(s):
2670 # Decode one char at a time
2671 for c in s:
2672 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002673 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002674 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002675 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002676 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002677 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002678 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002680 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002681 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002682 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002683 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002684 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 input = "abc"
2686 if encoder is not None:
2687 encoder.reset()
2688 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002689 self.assertEqual(decoder.decode(input), "abc")
2690 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002691
2692 def test_newline_decoder(self):
2693 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 # None meaning the IncrementalNewlineDecoder takes unicode input
2695 # rather than bytes input
2696 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002697 'utf-16', 'utf-16-le', 'utf-16-be',
2698 'utf-32', 'utf-32-le', 'utf-32-be',
2699 )
2700 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002701 decoder = enc and codecs.getincrementaldecoder(enc)()
2702 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2703 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002704 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2706 self.check_newline_decoding_utf8(decoder)
2707
Antoine Pitrou66913e22009-03-06 23:40:56 +00002708 def test_newline_bytes(self):
2709 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2710 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002711 self.assertEqual(dec.newlines, None)
2712 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2713 self.assertEqual(dec.newlines, None)
2714 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2715 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002716 dec = self.IncrementalNewlineDecoder(None, translate=False)
2717 _check(dec)
2718 dec = self.IncrementalNewlineDecoder(None, translate=True)
2719 _check(dec)
2720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002721class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2722 pass
2723
2724class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2725 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002726
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002727
Guido van Rossum01a27522007-03-07 01:00:12 +00002728# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002729
Guido van Rossum5abbf752007-08-27 17:39:33 +00002730class MiscIOTest(unittest.TestCase):
2731
Barry Warsaw40e82462008-11-20 20:14:50 +00002732 def tearDown(self):
2733 support.unlink(support.TESTFN)
2734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002735 def test___all__(self):
2736 for name in self.io.__all__:
2737 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002738 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002739 if name == "open":
2740 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002741 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002742 self.assertTrue(issubclass(obj, Exception), name)
2743 elif not name.startswith("SEEK_"):
2744 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002745
Barry Warsaw40e82462008-11-20 20:14:50 +00002746 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002748 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002749 f.close()
2750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002751 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002752 self.assertEqual(f.name, support.TESTFN)
2753 self.assertEqual(f.buffer.name, support.TESTFN)
2754 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2755 self.assertEqual(f.mode, "U")
2756 self.assertEqual(f.buffer.mode, "rb")
2757 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002758 f.close()
2759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002761 self.assertEqual(f.mode, "w+")
2762 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2763 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(g.mode, "wb")
2767 self.assertEqual(g.raw.mode, "wb")
2768 self.assertEqual(g.name, f.fileno())
2769 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002770 f.close()
2771 g.close()
2772
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002773 def test_io_after_close(self):
2774 for kwargs in [
2775 {"mode": "w"},
2776 {"mode": "wb"},
2777 {"mode": "w", "buffering": 1},
2778 {"mode": "w", "buffering": 2},
2779 {"mode": "wb", "buffering": 0},
2780 {"mode": "r"},
2781 {"mode": "rb"},
2782 {"mode": "r", "buffering": 1},
2783 {"mode": "r", "buffering": 2},
2784 {"mode": "rb", "buffering": 0},
2785 {"mode": "w+"},
2786 {"mode": "w+b"},
2787 {"mode": "w+", "buffering": 1},
2788 {"mode": "w+", "buffering": 2},
2789 {"mode": "w+b", "buffering": 0},
2790 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002791 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002792 f.close()
2793 self.assertRaises(ValueError, f.flush)
2794 self.assertRaises(ValueError, f.fileno)
2795 self.assertRaises(ValueError, f.isatty)
2796 self.assertRaises(ValueError, f.__iter__)
2797 if hasattr(f, "peek"):
2798 self.assertRaises(ValueError, f.peek, 1)
2799 self.assertRaises(ValueError, f.read)
2800 if hasattr(f, "read1"):
2801 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002802 if hasattr(f, "readall"):
2803 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002804 if hasattr(f, "readinto"):
2805 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2806 self.assertRaises(ValueError, f.readline)
2807 self.assertRaises(ValueError, f.readlines)
2808 self.assertRaises(ValueError, f.seek, 0)
2809 self.assertRaises(ValueError, f.tell)
2810 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002811 self.assertRaises(ValueError, f.write,
2812 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002813 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002816 def test_blockingioerror(self):
2817 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002818 class C(str):
2819 pass
2820 c = C("")
2821 b = self.BlockingIOError(1, c)
2822 c.b = b
2823 b.c = c
2824 wr = weakref.ref(c)
2825 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002826 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002827 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828
2829 def test_abcs(self):
2830 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002831 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2832 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2833 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2834 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002835
2836 def _check_abc_inheritance(self, abcmodule):
2837 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002838 self.assertIsInstance(f, abcmodule.IOBase)
2839 self.assertIsInstance(f, abcmodule.RawIOBase)
2840 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2841 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002843 self.assertIsInstance(f, abcmodule.IOBase)
2844 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2845 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2846 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002847 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002848 self.assertIsInstance(f, abcmodule.IOBase)
2849 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2850 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2851 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852
2853 def test_abc_inheritance(self):
2854 # Test implementations inherit from their respective ABCs
2855 self._check_abc_inheritance(self)
2856
2857 def test_abc_inheritance_official(self):
2858 # Test implementations inherit from the official ABCs of the
2859 # baseline "io" module.
2860 self._check_abc_inheritance(io)
2861
Antoine Pitroue033e062010-10-29 10:38:18 +00002862 def _check_warn_on_dealloc(self, *args, **kwargs):
2863 f = open(*args, **kwargs)
2864 r = repr(f)
2865 with self.assertWarns(ResourceWarning) as cm:
2866 f = None
2867 support.gc_collect()
2868 self.assertIn(r, str(cm.warning.args[0]))
2869
2870 def test_warn_on_dealloc(self):
2871 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2872 self._check_warn_on_dealloc(support.TESTFN, "wb")
2873 self._check_warn_on_dealloc(support.TESTFN, "w")
2874
2875 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2876 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002877 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002878 for fd in fds:
2879 try:
2880 os.close(fd)
2881 except EnvironmentError as e:
2882 if e.errno != errno.EBADF:
2883 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002884 self.addCleanup(cleanup_fds)
2885 r, w = os.pipe()
2886 fds += r, w
2887 self._check_warn_on_dealloc(r, *args, **kwargs)
2888 # When using closefd=False, there's no warning
2889 r, w = os.pipe()
2890 fds += r, w
2891 with warnings.catch_warnings(record=True) as recorded:
2892 open(r, *args, closefd=False, **kwargs)
2893 support.gc_collect()
2894 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002895
2896 def test_warn_on_dealloc_fd(self):
2897 self._check_warn_on_dealloc_fd("rb", buffering=0)
2898 self._check_warn_on_dealloc_fd("rb")
2899 self._check_warn_on_dealloc_fd("r")
2900
2901
Antoine Pitrou243757e2010-11-05 21:15:39 +00002902 def test_pickling(self):
2903 # Pickling file objects is forbidden
2904 for kwargs in [
2905 {"mode": "w"},
2906 {"mode": "wb"},
2907 {"mode": "wb", "buffering": 0},
2908 {"mode": "r"},
2909 {"mode": "rb"},
2910 {"mode": "rb", "buffering": 0},
2911 {"mode": "w+"},
2912 {"mode": "w+b"},
2913 {"mode": "w+b", "buffering": 0},
2914 ]:
2915 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2916 with self.open(support.TESTFN, **kwargs) as f:
2917 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2918
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002919 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2920 def test_nonblock_pipe_write_bigbuf(self):
2921 self._test_nonblock_pipe_write(16*1024)
2922
2923 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2924 def test_nonblock_pipe_write_smallbuf(self):
2925 self._test_nonblock_pipe_write(1024)
2926
2927 def _set_non_blocking(self, fd):
2928 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2929 self.assertNotEqual(flags, -1)
2930 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2931 self.assertEqual(res, 0)
2932
2933 def _test_nonblock_pipe_write(self, bufsize):
2934 sent = []
2935 received = []
2936 r, w = os.pipe()
2937 self._set_non_blocking(r)
2938 self._set_non_blocking(w)
2939
2940 # To exercise all code paths in the C implementation we need
2941 # to play with buffer sizes. For instance, if we choose a
2942 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2943 # then we will never get a partial write of the buffer.
2944 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2945 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2946
2947 with rf, wf:
2948 for N in 9999, 73, 7574:
2949 try:
2950 i = 0
2951 while True:
2952 msg = bytes([i % 26 + 97]) * N
2953 sent.append(msg)
2954 wf.write(msg)
2955 i += 1
2956
2957 except self.BlockingIOError as e:
2958 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002959 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002960 sent[-1] = sent[-1][:e.characters_written]
2961 received.append(rf.read())
2962 msg = b'BLOCKED'
2963 wf.write(msg)
2964 sent.append(msg)
2965
2966 while True:
2967 try:
2968 wf.flush()
2969 break
2970 except self.BlockingIOError as e:
2971 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002972 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002973 self.assertEqual(e.characters_written, 0)
2974 received.append(rf.read())
2975
2976 received += iter(rf.read, None)
2977
2978 sent, received = b''.join(sent), b''.join(received)
2979 self.assertTrue(sent == received)
2980 self.assertTrue(wf.closed)
2981 self.assertTrue(rf.closed)
2982
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002983 def test_create_fail(self):
2984 # 'x' mode fails if file is existing
2985 with self.open(support.TESTFN, 'w'):
2986 pass
2987 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2988
2989 def test_create_writes(self):
2990 # 'x' mode opens for writing
2991 with self.open(support.TESTFN, 'xb') as f:
2992 f.write(b"spam")
2993 with self.open(support.TESTFN, 'rb') as f:
2994 self.assertEqual(b"spam", f.read())
2995
Christian Heimes7b648752012-09-10 14:48:43 +02002996 def test_open_allargs(self):
2997 # there used to be a buffer overflow in the parser for rawmode
2998 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2999
3000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001class CMiscIOTest(MiscIOTest):
3002 io = io
3003
3004class PyMiscIOTest(MiscIOTest):
3005 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00003006
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003007
3008@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3009class SignalsTest(unittest.TestCase):
3010
3011 def setUp(self):
3012 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3013
3014 def tearDown(self):
3015 signal.signal(signal.SIGALRM, self.oldalrm)
3016
3017 def alarm_interrupt(self, sig, frame):
3018 1/0
3019
3020 @unittest.skipUnless(threading, 'Threading required for this test.')
3021 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3022 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00003023 invokes the signal handler, and bubbles up the exception raised
3024 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003025 read_results = []
3026 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003027 if hasattr(signal, 'pthread_sigmask'):
3028 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003029 s = os.read(r, 1)
3030 read_results.append(s)
3031 t = threading.Thread(target=_read)
3032 t.daemon = True
3033 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003034 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003035 try:
3036 wio = self.io.open(w, **fdopen_kwargs)
3037 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003038 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003039 # Fill the pipe enough that the write will be blocking.
3040 # It will be interrupted by the timer armed above. Since the
3041 # other thread has read one byte, the low-level write will
3042 # return with a successful (partial) result rather than an EINTR.
3043 # The buffered IO layer must check for pending signal
3044 # handlers, which in this case will invoke alarm_interrupt().
3045 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02003046 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003047 t.join()
3048 # We got one byte, get another one and check that it isn't a
3049 # repeat of the first one.
3050 read_results.append(os.read(r, 1))
3051 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3052 finally:
3053 os.close(w)
3054 os.close(r)
3055 # This is deliberate. If we didn't close the file descriptor
3056 # before closing wio, wio would try to flush its internal
3057 # buffer, and block again.
3058 try:
3059 wio.close()
3060 except IOError as e:
3061 if e.errno != errno.EBADF:
3062 raise
3063
3064 def test_interrupted_write_unbuffered(self):
3065 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3066
3067 def test_interrupted_write_buffered(self):
3068 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3069
3070 def test_interrupted_write_text(self):
3071 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3072
Brett Cannon31f59292011-02-21 19:29:56 +00003073 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003074 def check_reentrant_write(self, data, **fdopen_kwargs):
3075 def on_alarm(*args):
3076 # Will be called reentrantly from the same thread
3077 wio.write(data)
3078 1/0
3079 signal.signal(signal.SIGALRM, on_alarm)
3080 r, w = os.pipe()
3081 wio = self.io.open(w, **fdopen_kwargs)
3082 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003083 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003084 # Either the reentrant call to wio.write() fails with RuntimeError,
3085 # or the signal handler raises ZeroDivisionError.
3086 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3087 while 1:
3088 for i in range(100):
3089 wio.write(data)
3090 wio.flush()
3091 # Make sure the buffer doesn't fill up and block further writes
3092 os.read(r, len(data) * 100)
3093 exc = cm.exception
3094 if isinstance(exc, RuntimeError):
3095 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3096 finally:
3097 wio.close()
3098 os.close(r)
3099
3100 def test_reentrant_write_buffered(self):
3101 self.check_reentrant_write(b"xy", mode="wb")
3102
3103 def test_reentrant_write_text(self):
3104 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3105
Antoine Pitrou707ce822011-02-25 21:24:11 +00003106 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3107 """Check that a buffered read, when it gets interrupted (either
3108 returning a partial result or EINTR), properly invokes the signal
3109 handler and retries if the latter returned successfully."""
3110 r, w = os.pipe()
3111 fdopen_kwargs["closefd"] = False
3112 def alarm_handler(sig, frame):
3113 os.write(w, b"bar")
3114 signal.signal(signal.SIGALRM, alarm_handler)
3115 try:
3116 rio = self.io.open(r, **fdopen_kwargs)
3117 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003118 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003119 # Expected behaviour:
3120 # - first raw read() returns partial b"foo"
3121 # - second raw read() returns EINTR
3122 # - third raw read() returns b"bar"
3123 self.assertEqual(decode(rio.read(6)), "foobar")
3124 finally:
3125 rio.close()
3126 os.close(w)
3127 os.close(r)
3128
Antoine Pitrou20db5112011-08-19 20:32:34 +02003129 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003130 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3131 mode="rb")
3132
Antoine Pitrou20db5112011-08-19 20:32:34 +02003133 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003134 self.check_interrupted_read_retry(lambda x: x,
3135 mode="r")
3136
3137 @unittest.skipUnless(threading, 'Threading required for this test.')
3138 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3139 """Check that a buffered write, when it gets interrupted (either
3140 returning a partial result or EINTR), properly invokes the signal
3141 handler and retries if the latter returned successfully."""
3142 select = support.import_module("select")
3143 # A quantity that exceeds the buffer size of an anonymous pipe's
3144 # write end.
3145 N = 1024 * 1024
3146 r, w = os.pipe()
3147 fdopen_kwargs["closefd"] = False
3148 # We need a separate thread to read from the pipe and allow the
3149 # write() to finish. This thread is started after the SIGALRM is
3150 # received (forcing a first EINTR in write()).
3151 read_results = []
3152 write_finished = False
3153 def _read():
3154 while not write_finished:
3155 while r in select.select([r], [], [], 1.0)[0]:
3156 s = os.read(r, 1024)
3157 read_results.append(s)
3158 t = threading.Thread(target=_read)
3159 t.daemon = True
3160 def alarm1(sig, frame):
3161 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003162 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003163 def alarm2(sig, frame):
3164 t.start()
3165 signal.signal(signal.SIGALRM, alarm1)
3166 try:
3167 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003168 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003169 # Expected behaviour:
3170 # - first raw write() is partial (because of the limited pipe buffer
3171 # and the first alarm)
3172 # - second raw write() returns EINTR (because of the second alarm)
3173 # - subsequent write()s are successful (either partial or complete)
3174 self.assertEqual(N, wio.write(item * N))
3175 wio.flush()
3176 write_finished = True
3177 t.join()
3178 self.assertEqual(N, sum(len(x) for x in read_results))
3179 finally:
3180 write_finished = True
3181 os.close(w)
3182 os.close(r)
3183 # This is deliberate. If we didn't close the file descriptor
3184 # before closing wio, wio would try to flush its internal
3185 # buffer, and could block (in case of failure).
3186 try:
3187 wio.close()
3188 except IOError as e:
3189 if e.errno != errno.EBADF:
3190 raise
3191
Antoine Pitrou20db5112011-08-19 20:32:34 +02003192 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003193 self.check_interrupted_write_retry(b"x", mode="wb")
3194
Antoine Pitrou20db5112011-08-19 20:32:34 +02003195 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003196 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3197
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003198
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003199class CSignalsTest(SignalsTest):
3200 io = io
3201
3202class PySignalsTest(SignalsTest):
3203 io = pyio
3204
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003205 # Handling reentrancy issues would slow down _pyio even more, so the
3206 # tests are disabled.
3207 test_reentrant_write_buffered = None
3208 test_reentrant_write_text = None
3209
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003210
Guido van Rossum28524c72007-02-27 05:47:44 +00003211def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003212 tests = (CIOTest, PyIOTest,
3213 CBufferedReaderTest, PyBufferedReaderTest,
3214 CBufferedWriterTest, PyBufferedWriterTest,
3215 CBufferedRWPairTest, PyBufferedRWPairTest,
3216 CBufferedRandomTest, PyBufferedRandomTest,
3217 StatefulIncrementalDecoderTest,
3218 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3219 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003220 CMiscIOTest, PyMiscIOTest,
3221 CSignalsTest, PySignalsTest,
3222 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003223
3224 # Put the namespaces of the IO module we are testing and some useful mock
3225 # classes in the __dict__ of each test.
3226 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003227 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003228 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3229 c_io_ns = {name : getattr(io, name) for name in all_members}
3230 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3231 globs = globals()
3232 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3233 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3234 # Avoid turning open into a bound method.
3235 py_io_ns["open"] = pyio.OpenWrapper
3236 for test in tests:
3237 if test.__name__.startswith("C"):
3238 for name, obj in c_io_ns.items():
3239 setattr(test, name, obj)
3240 elif test.__name__.startswith("Py"):
3241 for name, obj in py_io_ns.items():
3242 setattr(test, name, obj)
3243
3244 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003245
3246if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003247 test_main()