blob: 28d3e0f4ac186db34a4107253599a1c75eac6066 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000053 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600606 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000607
608 def test_multi_close(self):
609 f = self.open(support.TESTFN, "wb", buffering=0)
610 f.close()
611 f.close()
612 f.close()
613 self.assertRaises(ValueError, f.flush)
614
Antoine Pitrou328ec742010-09-14 18:37:24 +0000615 def test_RawIOBase_read(self):
616 # Exercise the default RawIOBase.read() implementation (which calls
617 # readinto() internally).
618 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
619 self.assertEqual(rawio.read(2), b"ab")
620 self.assertEqual(rawio.read(2), b"c")
621 self.assertEqual(rawio.read(2), b"d")
622 self.assertEqual(rawio.read(2), None)
623 self.assertEqual(rawio.read(2), b"ef")
624 self.assertEqual(rawio.read(2), b"g")
625 self.assertEqual(rawio.read(2), None)
626 self.assertEqual(rawio.read(2), b"")
627
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400628 def test_types_have_dict(self):
629 test = (
630 self.IOBase(),
631 self.RawIOBase(),
632 self.TextIOBase(),
633 self.StringIO(),
634 self.BytesIO()
635 )
636 for obj in test:
637 self.assertTrue(hasattr(obj, "__dict__"))
638
Ross Lagerwall59142db2011-10-31 20:34:46 +0200639 def test_opener(self):
640 with self.open(support.TESTFN, "w") as f:
641 f.write("egg\n")
642 fd = os.open(support.TESTFN, os.O_RDONLY)
643 def opener(path, flags):
644 return fd
645 with self.open("non-existent", "r", opener=opener) as f:
646 self.assertEqual(f.read(), "egg\n")
647
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200648 def test_fileio_closefd(self):
649 # Issue #4841
650 with self.open(__file__, 'rb') as f1, \
651 self.open(__file__, 'rb') as f2:
652 fileio = self.FileIO(f1.fileno(), closefd=False)
653 # .__init__() must not close f1
654 fileio.__init__(f2.fileno(), closefd=False)
655 f1.readline()
656 # .close() must not close f2
657 fileio.close()
658 f2.readline()
659
660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200662
663 def test_IOBase_finalize(self):
664 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
665 # class which inherits IOBase and an object of this class are caught
666 # in a reference cycle and close() is already in the method cache.
667 class MyIO(self.IOBase):
668 def close(self):
669 pass
670
671 # create an instance to populate the method cache
672 MyIO()
673 obj = MyIO()
674 obj.obj = obj
675 wr = weakref.ref(obj)
676 del MyIO
677 del obj
678 support.gc_collect()
679 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681class PyIOTest(IOTest):
682 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000683
Guido van Rossuma9e20242007-03-08 00:43:48 +0000684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000685class CommonBufferedTests:
686 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
687
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000688 def test_detach(self):
689 raw = self.MockRawIO()
690 buf = self.tp(raw)
691 self.assertIs(buf.detach(), raw)
692 self.assertRaises(ValueError, buf.detach)
693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_fileno(self):
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697
Ezio Melottib3aedd42010-11-20 19:04:17 +0000698 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699
700 def test_no_fileno(self):
701 # XXX will we always have fileno() function? If so, kill
702 # this test. Else, write it.
703 pass
704
705 def test_invalid_args(self):
706 rawio = self.MockRawIO()
707 bufio = self.tp(rawio)
708 # Invalid whence
709 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200710 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711
712 def test_override_destructor(self):
713 tp = self.tp
714 record = []
715 class MyBufferedIO(tp):
716 def __del__(self):
717 record.append(1)
718 try:
719 f = super().__del__
720 except AttributeError:
721 pass
722 else:
723 f()
724 def close(self):
725 record.append(2)
726 super().close()
727 def flush(self):
728 record.append(3)
729 super().flush()
730 rawio = self.MockRawIO()
731 bufio = MyBufferedIO(rawio)
732 writable = bufio.writable()
733 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000734 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 if writable:
736 self.assertEqual(record, [1, 2, 3])
737 else:
738 self.assertEqual(record, [1, 2])
739
740 def test_context_manager(self):
741 # Test usability as a context manager
742 rawio = self.MockRawIO()
743 bufio = self.tp(rawio)
744 def _with():
745 with bufio:
746 pass
747 _with()
748 # bufio should now be closed, and using it a second time should raise
749 # a ValueError.
750 self.assertRaises(ValueError, _with)
751
752 def test_error_through_destructor(self):
753 # Test that the exception state is not modified by a destructor,
754 # even if close() fails.
755 rawio = self.CloseFailureIO()
756 def f():
757 self.tp(rawio).xyzzy
758 with support.captured_output("stderr") as s:
759 self.assertRaises(AttributeError, f)
760 s = s.getvalue().strip()
761 if s:
762 # The destructor *may* have printed an unraisable error, check it
763 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000764 self.assertTrue(s.startswith("Exception IOError: "), s)
765 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000766
Antoine Pitrou716c4442009-05-23 19:04:03 +0000767 def test_repr(self):
768 raw = self.MockRawIO()
769 b = self.tp(raw)
770 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
771 self.assertEqual(repr(b), "<%s>" % clsname)
772 raw.name = "dummy"
773 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
774 raw.name = b"dummy"
775 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
776
Antoine Pitrou6be88762010-05-03 16:48:20 +0000777 def test_flush_error_on_close(self):
778 raw = self.MockRawIO()
779 def bad_flush():
780 raise IOError()
781 raw.flush = bad_flush
782 b = self.tp(raw)
783 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600784 self.assertTrue(b.closed)
785
786 def test_close_error_on_close(self):
787 raw = self.MockRawIO()
788 def bad_flush():
789 raise IOError('flush')
790 def bad_close():
791 raise IOError('close')
792 raw.close = bad_close
793 b = self.tp(raw)
794 b.flush = bad_flush
795 with self.assertRaises(IOError) as err: # exception not swallowed
796 b.close()
797 self.assertEqual(err.exception.args, ('close',))
798 self.assertEqual(err.exception.__context__.args, ('flush',))
799 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000800
801 def test_multi_close(self):
802 raw = self.MockRawIO()
803 b = self.tp(raw)
804 b.close()
805 b.close()
806 b.close()
807 self.assertRaises(ValueError, b.flush)
808
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000809 def test_unseekable(self):
810 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
811 self.assertRaises(self.UnsupportedOperation, bufio.tell)
812 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
813
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises(AttributeError):
819 buf.raw = x
820
Guido van Rossum78892e42007-04-06 17:31:18 +0000821
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
Jesus Ceadc469452012-10-04 12:37:56 +0200835 @support.cpython_only
836 def test_buffer_freeing(self) :
837 bufsize = 4096
838 rawio = self.MockRawIO()
839 bufio = self.tp(rawio, buffer_size=bufsize)
840 size = sys.getsizeof(bufio) - bufsize
841 bufio.close()
842 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
845 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 def test_constructor(self):
848 rawio = self.MockRawIO([b"abc"])
849 bufio = self.tp(rawio)
850 bufio.__init__(rawio)
851 bufio.__init__(rawio, buffer_size=1024)
852 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000853 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
855 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
856 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
857 rawio = self.MockRawIO([b"abc"])
858 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000859 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000860
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000861 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000862 for arg in (None, 7):
863 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
864 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000865 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 # Invalid args
867 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_read1(self):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000872 self.assertEqual(b"a", bufio.read(1))
873 self.assertEqual(b"b", bufio.read1(1))
874 self.assertEqual(rawio._reads, 1)
875 self.assertEqual(b"c", bufio.read1(100))
876 self.assertEqual(rawio._reads, 1)
877 self.assertEqual(b"d", bufio.read1(100))
878 self.assertEqual(rawio._reads, 2)
879 self.assertEqual(b"efg", bufio.read1(100))
880 self.assertEqual(rawio._reads, 3)
881 self.assertEqual(b"", bufio.read1(100))
882 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 # Invalid args
884 self.assertRaises(ValueError, bufio.read1, -1)
885
886 def test_readinto(self):
887 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
888 bufio = self.tp(rawio)
889 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(bufio.readinto(b), 2)
891 self.assertEqual(b, b"ab")
892 self.assertEqual(bufio.readinto(b), 2)
893 self.assertEqual(b, b"cd")
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"ef")
896 self.assertEqual(bufio.readinto(b), 1)
897 self.assertEqual(b, b"gf")
898 self.assertEqual(bufio.readinto(b), 0)
899 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200900 rawio = self.MockRawIO((b"abc", None))
901 bufio = self.tp(rawio)
902 self.assertEqual(bufio.readinto(b), 2)
903 self.assertEqual(b, b"ab")
904 self.assertEqual(bufio.readinto(b), 1)
905 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000907 def test_readlines(self):
908 def bufio():
909 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
910 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000911 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
912 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
913 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000916 data = b"abcdefghi"
917 dlen = len(data)
918
919 tests = [
920 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
921 [ 100, [ 3, 3, 3], [ dlen ] ],
922 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
923 ]
924
925 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926 rawio = self.MockFileIO(data)
927 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000928 pos = 0
929 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000930 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000931 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000933 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000936 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000937 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
938 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000939 self.assertEqual(b"abcd", bufio.read(6))
940 self.assertEqual(b"e", bufio.read(1))
941 self.assertEqual(b"fg", bufio.read())
942 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200943 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000944 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000945
Victor Stinnera80987f2011-05-25 22:47:16 +0200946 rawio = self.MockRawIO((b"a", None, None))
947 self.assertEqual(b"a", rawio.readall())
948 self.assertIsNone(rawio.readall())
949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 def test_read_past_eof(self):
951 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
952 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000953
Ezio Melottib3aedd42010-11-20 19:04:17 +0000954 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000956 def test_read_all(self):
957 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
958 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000959
Ezio Melottib3aedd42010-11-20 19:04:17 +0000960 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000961
Victor Stinner45df8202010-04-28 22:31:17 +0000962 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000963 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000965 try:
966 # Write out many bytes with exactly the same number of 0's,
967 # 1's... 255's. This will help us check that concurrent reading
968 # doesn't duplicate or forget contents.
969 N = 1000
970 l = list(range(256)) * N
971 random.shuffle(l)
972 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000973 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000974 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000975 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000977 errors = []
978 results = []
979 def f():
980 try:
981 # Intra-buffer read then buffer-flushing read
982 for n in cycle([1, 19]):
983 s = bufio.read(n)
984 if not s:
985 break
986 # list.append() is atomic
987 results.append(s)
988 except Exception as e:
989 errors.append(e)
990 raise
991 threads = [threading.Thread(target=f) for x in range(20)]
992 for t in threads:
993 t.start()
994 time.sleep(0.02) # yield
995 for t in threads:
996 t.join()
997 self.assertFalse(errors,
998 "the following exceptions were caught: %r" % errors)
999 s = b''.join(results)
1000 for i in range(256):
1001 c = bytes(bytearray([i]))
1002 self.assertEqual(s.count(c), N)
1003 finally:
1004 support.unlink(support.TESTFN)
1005
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001006 def test_unseekable(self):
1007 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1008 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1009 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1010 bufio.read(1)
1011 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1012 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1013
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001014 def test_misbehaved_io(self):
1015 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1016 bufio = self.tp(rawio)
1017 self.assertRaises(IOError, bufio.seek, 0)
1018 self.assertRaises(IOError, bufio.tell)
1019
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001020 def test_no_extraneous_read(self):
1021 # Issue #9550; when the raw IO object has satisfied the read request,
1022 # we should not issue any additional reads, otherwise it may block
1023 # (e.g. socket).
1024 bufsize = 16
1025 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1026 rawio = self.MockRawIO([b"x" * n])
1027 bufio = self.tp(rawio, bufsize)
1028 self.assertEqual(bufio.read(n), b"x" * n)
1029 # Simple case: one raw read is enough to satisfy the request.
1030 self.assertEqual(rawio._extraneous_reads, 0,
1031 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1032 # A more complex case where two raw reads are needed to satisfy
1033 # the request.
1034 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1035 bufio = self.tp(rawio, bufsize)
1036 self.assertEqual(bufio.read(n), b"x" * n)
1037 self.assertEqual(rawio._extraneous_reads, 0,
1038 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1039
1040
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001041class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 tp = io.BufferedReader
1043
1044 def test_constructor(self):
1045 BufferedReaderTest.test_constructor(self)
1046 # The allocation can succeed on 32-bit builds, e.g. with more
1047 # than 2GB RAM and a 64-bit kernel.
1048 if sys.maxsize > 0x7FFFFFFF:
1049 rawio = self.MockRawIO()
1050 bufio = self.tp(rawio)
1051 self.assertRaises((OverflowError, MemoryError, ValueError),
1052 bufio.__init__, rawio, sys.maxsize)
1053
1054 def test_initialization(self):
1055 rawio = self.MockRawIO([b"abc"])
1056 bufio = self.tp(rawio)
1057 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1058 self.assertRaises(ValueError, bufio.read)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1060 self.assertRaises(ValueError, bufio.read)
1061 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1062 self.assertRaises(ValueError, bufio.read)
1063
1064 def test_misbehaved_io_read(self):
1065 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1066 bufio = self.tp(rawio)
1067 # _pyio.BufferedReader seems to implement reading different, so that
1068 # checking this is not so easy.
1069 self.assertRaises(IOError, bufio.read, 10)
1070
1071 def test_garbage_collection(self):
1072 # C BufferedReader objects are collected.
1073 # The Python version has __del__, so it ends into gc.garbage instead
1074 rawio = self.FileIO(support.TESTFN, "w+b")
1075 f = self.tp(rawio)
1076 f.f = f
1077 wr = weakref.ref(f)
1078 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001079 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001080 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081
1082class PyBufferedReaderTest(BufferedReaderTest):
1083 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001084
Guido van Rossuma9e20242007-03-08 00:43:48 +00001085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1087 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001088
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 def test_constructor(self):
1090 rawio = self.MockRawIO()
1091 bufio = self.tp(rawio)
1092 bufio.__init__(rawio)
1093 bufio.__init__(rawio, buffer_size=1024)
1094 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001095 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 bufio.flush()
1097 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1098 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1099 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1100 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001101 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001103 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001104
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001105 def test_detach_flush(self):
1106 raw = self.MockRawIO()
1107 buf = self.tp(raw)
1108 buf.write(b"howdy!")
1109 self.assertFalse(raw._write_stack)
1110 buf.detach()
1111 self.assertEqual(raw._write_stack, [b"howdy!"])
1112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001114 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001115 writer = self.MockRawIO()
1116 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001117 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001118 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 def test_write_overflow(self):
1121 writer = self.MockRawIO()
1122 bufio = self.tp(writer, 8)
1123 contents = b"abcdefghijklmnop"
1124 for n in range(0, len(contents), 3):
1125 bufio.write(contents[n:n+3])
1126 flushed = b"".join(writer._write_stack)
1127 # At least (total - 8) bytes were implicitly flushed, perhaps more
1128 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001129 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001130
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 def check_writes(self, intermediate_func):
1132 # Lots of writes, test the flushed output is as expected.
1133 contents = bytes(range(256)) * 1000
1134 n = 0
1135 writer = self.MockRawIO()
1136 bufio = self.tp(writer, 13)
1137 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1138 def gen_sizes():
1139 for size in count(1):
1140 for i in range(15):
1141 yield size
1142 sizes = gen_sizes()
1143 while n < len(contents):
1144 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001145 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 intermediate_func(bufio)
1147 n += size
1148 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001150
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 def test_writes(self):
1152 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 def test_writes_and_flushes(self):
1155 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 def test_writes_and_seeks(self):
1158 def _seekabs(bufio):
1159 pos = bufio.tell()
1160 bufio.seek(pos + 1, 0)
1161 bufio.seek(pos - 1, 0)
1162 bufio.seek(pos, 0)
1163 self.check_writes(_seekabs)
1164 def _seekrel(bufio):
1165 pos = bufio.seek(0, 1)
1166 bufio.seek(+1, 1)
1167 bufio.seek(-1, 1)
1168 bufio.seek(pos, 0)
1169 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 def test_writes_and_truncates(self):
1172 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174 def test_write_non_blocking(self):
1175 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001176 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001177
Ezio Melottib3aedd42010-11-20 19:04:17 +00001178 self.assertEqual(bufio.write(b"abcd"), 4)
1179 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001180 # 1 byte will be written, the rest will be buffered
1181 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001182 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1185 raw.block_on(b"0")
1186 try:
1187 bufio.write(b"opqrwxyz0123456789")
1188 except self.BlockingIOError as e:
1189 written = e.characters_written
1190 else:
1191 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001192 self.assertEqual(written, 16)
1193 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001194 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001195
Ezio Melottib3aedd42010-11-20 19:04:17 +00001196 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197 s = raw.pop_written()
1198 # Previously buffered bytes were flushed
1199 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 def test_write_and_rewind(self):
1202 raw = io.BytesIO()
1203 bufio = self.tp(raw, 4)
1204 self.assertEqual(bufio.write(b"abcdef"), 6)
1205 self.assertEqual(bufio.tell(), 6)
1206 bufio.seek(0, 0)
1207 self.assertEqual(bufio.write(b"XY"), 2)
1208 bufio.seek(6, 0)
1209 self.assertEqual(raw.getvalue(), b"XYcdef")
1210 self.assertEqual(bufio.write(b"123456"), 6)
1211 bufio.flush()
1212 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214 def test_flush(self):
1215 writer = self.MockRawIO()
1216 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001217 bufio.write(b"abc")
1218 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001219 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001220
Antoine Pitrou131a4892012-10-16 22:57:11 +02001221 def test_writelines(self):
1222 l = [b'ab', b'cd', b'ef']
1223 writer = self.MockRawIO()
1224 bufio = self.tp(writer, 8)
1225 bufio.writelines(l)
1226 bufio.flush()
1227 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1228
1229 def test_writelines_userlist(self):
1230 l = UserList([b'ab', b'cd', b'ef'])
1231 writer = self.MockRawIO()
1232 bufio = self.tp(writer, 8)
1233 bufio.writelines(l)
1234 bufio.flush()
1235 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1236
1237 def test_writelines_error(self):
1238 writer = self.MockRawIO()
1239 bufio = self.tp(writer, 8)
1240 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1241 self.assertRaises(TypeError, bufio.writelines, None)
1242 self.assertRaises(TypeError, bufio.writelines, 'abc')
1243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 def test_destructor(self):
1245 writer = self.MockRawIO()
1246 bufio = self.tp(writer, 8)
1247 bufio.write(b"abc")
1248 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001249 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001250 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251
1252 def test_truncate(self):
1253 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001254 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001255 bufio = self.tp(raw, 8)
1256 bufio.write(b"abcdef")
1257 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001258 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001259 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001260 self.assertEqual(f.read(), b"abc")
1261
Victor Stinner45df8202010-04-28 22:31:17 +00001262 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001263 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001265 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001266 # Write out many bytes from many threads and test they were
1267 # all flushed.
1268 N = 1000
1269 contents = bytes(range(256)) * N
1270 sizes = cycle([1, 19])
1271 n = 0
1272 queue = deque()
1273 while n < len(contents):
1274 size = next(sizes)
1275 queue.append(contents[n:n+size])
1276 n += size
1277 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001278 # We use a real file object because it allows us to
1279 # exercise situations where the GIL is released before
1280 # writing the buffer to the raw streams. This is in addition
1281 # to concurrency issues due to switching threads in the middle
1282 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001283 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001284 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001285 errors = []
1286 def f():
1287 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001288 while True:
1289 try:
1290 s = queue.popleft()
1291 except IndexError:
1292 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001293 bufio.write(s)
1294 except Exception as e:
1295 errors.append(e)
1296 raise
1297 threads = [threading.Thread(target=f) for x in range(20)]
1298 for t in threads:
1299 t.start()
1300 time.sleep(0.02) # yield
1301 for t in threads:
1302 t.join()
1303 self.assertFalse(errors,
1304 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001306 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001307 s = f.read()
1308 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001309 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001310 finally:
1311 support.unlink(support.TESTFN)
1312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001313 def test_misbehaved_io(self):
1314 rawio = self.MisbehavedRawIO()
1315 bufio = self.tp(rawio, 5)
1316 self.assertRaises(IOError, bufio.seek, 0)
1317 self.assertRaises(IOError, bufio.tell)
1318 self.assertRaises(IOError, bufio.write, b"abcdef")
1319
Florent Xicluna109d5732012-07-07 17:03:22 +02001320 def test_max_buffer_size_removal(self):
1321 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001322 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001323
Benjamin Peterson68623612012-12-20 11:53:11 -06001324 def test_write_error_on_close(self):
1325 raw = self.MockRawIO()
1326 def bad_write(b):
1327 raise IOError()
1328 raw.write = bad_write
1329 b = self.tp(raw)
1330 b.write(b'spam')
1331 self.assertRaises(IOError, b.close) # exception not swallowed
1332 self.assertTrue(b.closed)
1333
Benjamin Peterson59406a92009-03-26 17:10:29 +00001334
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001335class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336 tp = io.BufferedWriter
1337
1338 def test_constructor(self):
1339 BufferedWriterTest.test_constructor(self)
1340 # The allocation can succeed on 32-bit builds, e.g. with more
1341 # than 2GB RAM and a 64-bit kernel.
1342 if sys.maxsize > 0x7FFFFFFF:
1343 rawio = self.MockRawIO()
1344 bufio = self.tp(rawio)
1345 self.assertRaises((OverflowError, MemoryError, ValueError),
1346 bufio.__init__, rawio, sys.maxsize)
1347
1348 def test_initialization(self):
1349 rawio = self.MockRawIO()
1350 bufio = self.tp(rawio)
1351 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1352 self.assertRaises(ValueError, bufio.write, b"def")
1353 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1354 self.assertRaises(ValueError, bufio.write, b"def")
1355 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1356 self.assertRaises(ValueError, bufio.write, b"def")
1357
1358 def test_garbage_collection(self):
1359 # C BufferedWriter objects are collected, and collecting them flushes
1360 # all data to disk.
1361 # The Python version has __del__, so it ends into gc.garbage instead
1362 rawio = self.FileIO(support.TESTFN, "w+b")
1363 f = self.tp(rawio)
1364 f.write(b"123xxx")
1365 f.x = f
1366 wr = weakref.ref(f)
1367 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001368 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001369 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001370 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001371 self.assertEqual(f.read(), b"123xxx")
1372
1373
1374class PyBufferedWriterTest(BufferedWriterTest):
1375 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001376
Guido van Rossum01a27522007-03-07 01:00:12 +00001377class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001378
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001379 def test_constructor(self):
1380 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001381 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001382
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001383 def test_detach(self):
1384 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1385 self.assertRaises(self.UnsupportedOperation, pair.detach)
1386
Florent Xicluna109d5732012-07-07 17:03:22 +02001387 def test_constructor_max_buffer_size_removal(self):
1388 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001389 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001390
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001391 def test_constructor_with_not_readable(self):
1392 class NotReadable(MockRawIO):
1393 def readable(self):
1394 return False
1395
1396 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1397
1398 def test_constructor_with_not_writeable(self):
1399 class NotWriteable(MockRawIO):
1400 def writable(self):
1401 return False
1402
1403 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1404
1405 def test_read(self):
1406 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1407
1408 self.assertEqual(pair.read(3), b"abc")
1409 self.assertEqual(pair.read(1), b"d")
1410 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001411 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1412 self.assertEqual(pair.read(None), b"abc")
1413
1414 def test_readlines(self):
1415 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1416 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1417 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1418 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001419
1420 def test_read1(self):
1421 # .read1() is delegated to the underlying reader object, so this test
1422 # can be shallow.
1423 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1424
1425 self.assertEqual(pair.read1(3), b"abc")
1426
1427 def test_readinto(self):
1428 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1429
1430 data = bytearray(5)
1431 self.assertEqual(pair.readinto(data), 5)
1432 self.assertEqual(data, b"abcde")
1433
1434 def test_write(self):
1435 w = self.MockRawIO()
1436 pair = self.tp(self.MockRawIO(), w)
1437
1438 pair.write(b"abc")
1439 pair.flush()
1440 pair.write(b"def")
1441 pair.flush()
1442 self.assertEqual(w._write_stack, [b"abc", b"def"])
1443
1444 def test_peek(self):
1445 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1446
1447 self.assertTrue(pair.peek(3).startswith(b"abc"))
1448 self.assertEqual(pair.read(3), b"abc")
1449
1450 def test_readable(self):
1451 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1452 self.assertTrue(pair.readable())
1453
1454 def test_writeable(self):
1455 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1456 self.assertTrue(pair.writable())
1457
1458 def test_seekable(self):
1459 # BufferedRWPairs are never seekable, even if their readers and writers
1460 # are.
1461 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1462 self.assertFalse(pair.seekable())
1463
1464 # .flush() is delegated to the underlying writer object and has been
1465 # tested in the test_write method.
1466
1467 def test_close_and_closed(self):
1468 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1469 self.assertFalse(pair.closed)
1470 pair.close()
1471 self.assertTrue(pair.closed)
1472
1473 def test_isatty(self):
1474 class SelectableIsAtty(MockRawIO):
1475 def __init__(self, isatty):
1476 MockRawIO.__init__(self)
1477 self._isatty = isatty
1478
1479 def isatty(self):
1480 return self._isatty
1481
1482 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1483 self.assertFalse(pair.isatty())
1484
1485 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1486 self.assertTrue(pair.isatty())
1487
1488 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1489 self.assertTrue(pair.isatty())
1490
1491 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1492 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494class CBufferedRWPairTest(BufferedRWPairTest):
1495 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001496
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497class PyBufferedRWPairTest(BufferedRWPairTest):
1498 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500
1501class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1502 read_mode = "rb+"
1503 write_mode = "wb+"
1504
1505 def test_constructor(self):
1506 BufferedReaderTest.test_constructor(self)
1507 BufferedWriterTest.test_constructor(self)
1508
1509 def test_read_and_write(self):
1510 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001511 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001512
1513 self.assertEqual(b"as", rw.read(2))
1514 rw.write(b"ddd")
1515 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001516 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001517 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001518 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001520 def test_seek_and_tell(self):
1521 raw = self.BytesIO(b"asdfghjkl")
1522 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001523
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(b"as", rw.read(2))
1525 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001526 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001527 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001528
Antoine Pitroue05565e2011-08-20 14:39:23 +02001529 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001530 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001531 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001532 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001533 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001534 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001535 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001536 self.assertEqual(7, rw.tell())
1537 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001538 rw.flush()
1539 self.assertEqual(b"asdf123fl", raw.getvalue())
1540
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001541 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543 def check_flush_and_read(self, read_func):
1544 raw = self.BytesIO(b"abcdefghi")
1545 bufio = self.tp(raw)
1546
Ezio Melottib3aedd42010-11-20 19:04:17 +00001547 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001549 self.assertEqual(b"ef", read_func(bufio, 2))
1550 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001552 self.assertEqual(6, bufio.tell())
1553 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001554 raw.seek(0, 0)
1555 raw.write(b"XYZ")
1556 # flush() resets the read buffer
1557 bufio.flush()
1558 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001559 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001560
1561 def test_flush_and_read(self):
1562 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1563
1564 def test_flush_and_readinto(self):
1565 def _readinto(bufio, n=-1):
1566 b = bytearray(n if n >= 0 else 9999)
1567 n = bufio.readinto(b)
1568 return bytes(b[:n])
1569 self.check_flush_and_read(_readinto)
1570
1571 def test_flush_and_peek(self):
1572 def _peek(bufio, n=-1):
1573 # This relies on the fact that the buffer can contain the whole
1574 # raw stream, otherwise peek() can return less.
1575 b = bufio.peek(n)
1576 if n != -1:
1577 b = b[:n]
1578 bufio.seek(len(b), 1)
1579 return b
1580 self.check_flush_and_read(_peek)
1581
1582 def test_flush_and_write(self):
1583 raw = self.BytesIO(b"abcdefghi")
1584 bufio = self.tp(raw)
1585
1586 bufio.write(b"123")
1587 bufio.flush()
1588 bufio.write(b"45")
1589 bufio.flush()
1590 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001591 self.assertEqual(b"12345fghi", raw.getvalue())
1592 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593
1594 def test_threads(self):
1595 BufferedReaderTest.test_threads(self)
1596 BufferedWriterTest.test_threads(self)
1597
1598 def test_writes_and_peek(self):
1599 def _peek(bufio):
1600 bufio.peek(1)
1601 self.check_writes(_peek)
1602 def _peek(bufio):
1603 pos = bufio.tell()
1604 bufio.seek(-1, 1)
1605 bufio.peek(1)
1606 bufio.seek(pos, 0)
1607 self.check_writes(_peek)
1608
1609 def test_writes_and_reads(self):
1610 def _read(bufio):
1611 bufio.seek(-1, 1)
1612 bufio.read(1)
1613 self.check_writes(_read)
1614
1615 def test_writes_and_read1s(self):
1616 def _read1(bufio):
1617 bufio.seek(-1, 1)
1618 bufio.read1(1)
1619 self.check_writes(_read1)
1620
1621 def test_writes_and_readintos(self):
1622 def _read(bufio):
1623 bufio.seek(-1, 1)
1624 bufio.readinto(bytearray(1))
1625 self.check_writes(_read)
1626
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001627 def test_write_after_readahead(self):
1628 # Issue #6629: writing after the buffer was filled by readahead should
1629 # first rewind the raw stream.
1630 for overwrite_size in [1, 5]:
1631 raw = self.BytesIO(b"A" * 10)
1632 bufio = self.tp(raw, 4)
1633 # Trigger readahead
1634 self.assertEqual(bufio.read(1), b"A")
1635 self.assertEqual(bufio.tell(), 1)
1636 # Overwriting should rewind the raw stream if it needs so
1637 bufio.write(b"B" * overwrite_size)
1638 self.assertEqual(bufio.tell(), overwrite_size + 1)
1639 # If the write size was smaller than the buffer size, flush() and
1640 # check that rewind happens.
1641 bufio.flush()
1642 self.assertEqual(bufio.tell(), overwrite_size + 1)
1643 s = raw.getvalue()
1644 self.assertEqual(s,
1645 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1646
Antoine Pitrou7c404892011-05-13 00:13:33 +02001647 def test_write_rewind_write(self):
1648 # Various combinations of reading / writing / seeking backwards / writing again
1649 def mutate(bufio, pos1, pos2):
1650 assert pos2 >= pos1
1651 # Fill the buffer
1652 bufio.seek(pos1)
1653 bufio.read(pos2 - pos1)
1654 bufio.write(b'\x02')
1655 # This writes earlier than the previous write, but still inside
1656 # the buffer.
1657 bufio.seek(pos1)
1658 bufio.write(b'\x01')
1659
1660 b = b"\x80\x81\x82\x83\x84"
1661 for i in range(0, len(b)):
1662 for j in range(i, len(b)):
1663 raw = self.BytesIO(b)
1664 bufio = self.tp(raw, 100)
1665 mutate(bufio, i, j)
1666 bufio.flush()
1667 expected = bytearray(b)
1668 expected[j] = 2
1669 expected[i] = 1
1670 self.assertEqual(raw.getvalue(), expected,
1671 "failed result for i=%d, j=%d" % (i, j))
1672
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001673 def test_truncate_after_read_or_write(self):
1674 raw = self.BytesIO(b"A" * 10)
1675 bufio = self.tp(raw, 100)
1676 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1677 self.assertEqual(bufio.truncate(), 2)
1678 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1679 self.assertEqual(bufio.truncate(), 4)
1680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 def test_misbehaved_io(self):
1682 BufferedReaderTest.test_misbehaved_io(self)
1683 BufferedWriterTest.test_misbehaved_io(self)
1684
Antoine Pitroue05565e2011-08-20 14:39:23 +02001685 def test_interleaved_read_write(self):
1686 # Test for issue #12213
1687 with self.BytesIO(b'abcdefgh') as raw:
1688 with self.tp(raw, 100) as f:
1689 f.write(b"1")
1690 self.assertEqual(f.read(1), b'b')
1691 f.write(b'2')
1692 self.assertEqual(f.read1(1), b'd')
1693 f.write(b'3')
1694 buf = bytearray(1)
1695 f.readinto(buf)
1696 self.assertEqual(buf, b'f')
1697 f.write(b'4')
1698 self.assertEqual(f.peek(1), b'h')
1699 f.flush()
1700 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1701
1702 with self.BytesIO(b'abc') as raw:
1703 with self.tp(raw, 100) as f:
1704 self.assertEqual(f.read(1), b'a')
1705 f.write(b"2")
1706 self.assertEqual(f.read(1), b'c')
1707 f.flush()
1708 self.assertEqual(raw.getvalue(), b'a2c')
1709
1710 def test_interleaved_readline_write(self):
1711 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1712 with self.tp(raw) as f:
1713 f.write(b'1')
1714 self.assertEqual(f.readline(), b'b\n')
1715 f.write(b'2')
1716 self.assertEqual(f.readline(), b'def\n')
1717 f.write(b'3')
1718 self.assertEqual(f.readline(), b'\n')
1719 f.flush()
1720 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1721
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001722 # You can't construct a BufferedRandom over a non-seekable stream.
1723 test_unseekable = None
1724
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001725class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726 tp = io.BufferedRandom
1727
1728 def test_constructor(self):
1729 BufferedRandomTest.test_constructor(self)
1730 # The allocation can succeed on 32-bit builds, e.g. with more
1731 # than 2GB RAM and a 64-bit kernel.
1732 if sys.maxsize > 0x7FFFFFFF:
1733 rawio = self.MockRawIO()
1734 bufio = self.tp(rawio)
1735 self.assertRaises((OverflowError, MemoryError, ValueError),
1736 bufio.__init__, rawio, sys.maxsize)
1737
1738 def test_garbage_collection(self):
1739 CBufferedReaderTest.test_garbage_collection(self)
1740 CBufferedWriterTest.test_garbage_collection(self)
1741
1742class PyBufferedRandomTest(BufferedRandomTest):
1743 tp = pyio.BufferedRandom
1744
1745
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001746# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1747# properties:
1748# - A single output character can correspond to many bytes of input.
1749# - The number of input bytes to complete the character can be
1750# undetermined until the last input byte is received.
1751# - The number of input bytes can vary depending on previous input.
1752# - A single input byte can correspond to many characters of output.
1753# - The number of output characters can be undetermined until the
1754# last input byte is received.
1755# - The number of output characters can vary depending on previous input.
1756
1757class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1758 """
1759 For testing seek/tell behavior with a stateful, buffering decoder.
1760
1761 Input is a sequence of words. Words may be fixed-length (length set
1762 by input) or variable-length (period-terminated). In variable-length
1763 mode, extra periods are ignored. Possible words are:
1764 - 'i' followed by a number sets the input length, I (maximum 99).
1765 When I is set to 0, words are space-terminated.
1766 - 'o' followed by a number sets the output length, O (maximum 99).
1767 - Any other word is converted into a word followed by a period on
1768 the output. The output word consists of the input word truncated
1769 or padded out with hyphens to make its length equal to O. If O
1770 is 0, the word is output verbatim without truncating or padding.
1771 I and O are initially set to 1. When I changes, any buffered input is
1772 re-scanned according to the new I. EOF also terminates the last word.
1773 """
1774
1775 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001776 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001777 self.reset()
1778
1779 def __repr__(self):
1780 return '<SID %x>' % id(self)
1781
1782 def reset(self):
1783 self.i = 1
1784 self.o = 1
1785 self.buffer = bytearray()
1786
1787 def getstate(self):
1788 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1789 return bytes(self.buffer), i*100 + o
1790
1791 def setstate(self, state):
1792 buffer, io = state
1793 self.buffer = bytearray(buffer)
1794 i, o = divmod(io, 100)
1795 self.i, self.o = i ^ 1, o ^ 1
1796
1797 def decode(self, input, final=False):
1798 output = ''
1799 for b in input:
1800 if self.i == 0: # variable-length, terminated with period
1801 if b == ord('.'):
1802 if self.buffer:
1803 output += self.process_word()
1804 else:
1805 self.buffer.append(b)
1806 else: # fixed-length, terminate after self.i bytes
1807 self.buffer.append(b)
1808 if len(self.buffer) == self.i:
1809 output += self.process_word()
1810 if final and self.buffer: # EOF terminates the last word
1811 output += self.process_word()
1812 return output
1813
1814 def process_word(self):
1815 output = ''
1816 if self.buffer[0] == ord('i'):
1817 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1818 elif self.buffer[0] == ord('o'):
1819 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1820 else:
1821 output = self.buffer.decode('ascii')
1822 if len(output) < self.o:
1823 output += '-'*self.o # pad out with hyphens
1824 if self.o:
1825 output = output[:self.o] # truncate to output length
1826 output += '.'
1827 self.buffer = bytearray()
1828 return output
1829
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001830 codecEnabled = False
1831
1832 @classmethod
1833 def lookupTestDecoder(cls, name):
1834 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001835 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001836 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001837 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001838 incrementalencoder=None,
1839 streamreader=None, streamwriter=None,
1840 incrementaldecoder=cls)
1841
1842# Register the previous decoder for testing.
1843# Disabled by default, tests will enable it.
1844codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1845
1846
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001847class StatefulIncrementalDecoderTest(unittest.TestCase):
1848 """
1849 Make sure the StatefulIncrementalDecoder actually works.
1850 """
1851
1852 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001853 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001854 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001855 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001856 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001857 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001858 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001859 # I=0, O=6 (variable-length input, fixed-length output)
1860 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1861 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001862 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001863 # I=6, O=3 (fixed-length input > fixed-length output)
1864 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1865 # I=0, then 3; O=29, then 15 (with longer output)
1866 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1867 'a----------------------------.' +
1868 'b----------------------------.' +
1869 'cde--------------------------.' +
1870 'abcdefghijabcde.' +
1871 'a.b------------.' +
1872 '.c.------------.' +
1873 'd.e------------.' +
1874 'k--------------.' +
1875 'l--------------.' +
1876 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001877 ]
1878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001880 # Try a few one-shot test cases.
1881 for input, eof, output in self.test_cases:
1882 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001883 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001884
1885 # Also test an unfinished decode, followed by forcing EOF.
1886 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001887 self.assertEqual(d.decode(b'oiabcd'), '')
1888 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001889
1890class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001891
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001892 def setUp(self):
1893 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1894 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001895 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001896
Guido van Rossumd0712812007-04-11 16:32:43 +00001897 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001898 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 def test_constructor(self):
1901 r = self.BytesIO(b"\xc3\xa9\n\n")
1902 b = self.BufferedReader(r, 1000)
1903 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001904 t.__init__(b, encoding="latin-1", newline="\r\n")
1905 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001906 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001907 t.__init__(b, encoding="utf-8", line_buffering=True)
1908 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001909 self.assertEqual(t.line_buffering, True)
1910 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911 self.assertRaises(TypeError, t.__init__, b, newline=42)
1912 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1913
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001914 def test_detach(self):
1915 r = self.BytesIO()
1916 b = self.BufferedWriter(r)
1917 t = self.TextIOWrapper(b)
1918 self.assertIs(t.detach(), b)
1919
1920 t = self.TextIOWrapper(b, encoding="ascii")
1921 t.write("howdy")
1922 self.assertFalse(r.getvalue())
1923 t.detach()
1924 self.assertEqual(r.getvalue(), b"howdy")
1925 self.assertRaises(ValueError, t.detach)
1926
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001927 def test_repr(self):
1928 raw = self.BytesIO("hello".encode("utf-8"))
1929 b = self.BufferedReader(raw)
1930 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001931 modname = self.TextIOWrapper.__module__
1932 self.assertEqual(repr(t),
1933 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1934 raw.name = "dummy"
1935 self.assertEqual(repr(t),
1936 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001937 t.mode = "r"
1938 self.assertEqual(repr(t),
1939 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001940 raw.name = b"dummy"
1941 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001942 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001943
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 def test_line_buffering(self):
1945 r = self.BytesIO()
1946 b = self.BufferedWriter(r, 1000)
1947 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001948 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001949 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001950 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001951 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001952 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001953 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001954
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001955 def test_default_encoding(self):
1956 old_environ = dict(os.environ)
1957 try:
1958 # try to get a user preferred encoding different than the current
1959 # locale encoding to check that TextIOWrapper() uses the current
1960 # locale encoding and not the user preferred encoding
1961 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1962 if key in os.environ:
1963 del os.environ[key]
1964
1965 current_locale_encoding = locale.getpreferredencoding(False)
1966 b = self.BytesIO()
1967 t = self.TextIOWrapper(b)
1968 self.assertEqual(t.encoding, current_locale_encoding)
1969 finally:
1970 os.environ.clear()
1971 os.environ.update(old_environ)
1972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001973 def test_encoding(self):
1974 # Check the encoding attribute is always set, and valid
1975 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001976 t = self.TextIOWrapper(b, encoding="utf-8")
1977 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001978 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001979 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 codecs.lookup(t.encoding)
1981
1982 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001983 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984 b = self.BytesIO(b"abc\n\xff\n")
1985 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001986 self.assertRaises(UnicodeError, t.read)
1987 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 b = self.BytesIO(b"abc\n\xff\n")
1989 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001990 self.assertRaises(UnicodeError, t.read)
1991 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 b = self.BytesIO(b"abc\n\xff\n")
1993 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001994 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001995 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 b = self.BytesIO(b"abc\n\xff\n")
1997 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001998 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002001 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 b = self.BytesIO()
2003 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002004 self.assertRaises(UnicodeError, t.write, "\xff")
2005 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006 b = self.BytesIO()
2007 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002008 self.assertRaises(UnicodeError, t.write, "\xff")
2009 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002010 b = self.BytesIO()
2011 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002012 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002013 t.write("abc\xffdef\n")
2014 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002015 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002016 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002017 b = self.BytesIO()
2018 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002019 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002020 t.write("abc\xffdef\n")
2021 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002022 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002025 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2026
2027 tests = [
2028 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002029 [ '', input_lines ],
2030 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2031 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2032 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002033 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002034 encodings = (
2035 'utf-8', 'latin-1',
2036 'utf-16', 'utf-16-le', 'utf-16-be',
2037 'utf-32', 'utf-32-le', 'utf-32-be',
2038 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002039
Guido van Rossum8358db22007-08-18 21:39:55 +00002040 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002041 # character in TextIOWrapper._pending_line.
2042 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002043 # XXX: str.encode() should return bytes
2044 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002045 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002046 for bufsize in range(1, 10):
2047 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002048 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2049 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002050 encoding=encoding)
2051 if do_reads:
2052 got_lines = []
2053 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002054 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002055 if c2 == '':
2056 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002057 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002058 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002059 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002060 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002061
2062 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002063 self.assertEqual(got_line, exp_line)
2064 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002066 def test_newlines_input(self):
2067 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002068 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2069 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002070 (None, normalized.decode("ascii").splitlines(keepends=True)),
2071 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002072 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2073 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2074 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002075 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 buf = self.BytesIO(testdata)
2077 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002078 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002079 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002080 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002082 def test_newlines_output(self):
2083 testdict = {
2084 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2085 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2086 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2087 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2088 }
2089 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2090 for newline, expected in tests:
2091 buf = self.BytesIO()
2092 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2093 txt.write("AAA\nB")
2094 txt.write("BB\nCCC\n")
2095 txt.write("X\rY\r\nZ")
2096 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002097 self.assertEqual(buf.closed, False)
2098 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002099
2100 def test_destructor(self):
2101 l = []
2102 base = self.BytesIO
2103 class MyBytesIO(base):
2104 def close(self):
2105 l.append(self.getvalue())
2106 base.close(self)
2107 b = MyBytesIO()
2108 t = self.TextIOWrapper(b, encoding="ascii")
2109 t.write("abc")
2110 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002111 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002112 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002113
2114 def test_override_destructor(self):
2115 record = []
2116 class MyTextIO(self.TextIOWrapper):
2117 def __del__(self):
2118 record.append(1)
2119 try:
2120 f = super().__del__
2121 except AttributeError:
2122 pass
2123 else:
2124 f()
2125 def close(self):
2126 record.append(2)
2127 super().close()
2128 def flush(self):
2129 record.append(3)
2130 super().flush()
2131 b = self.BytesIO()
2132 t = MyTextIO(b, encoding="ascii")
2133 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002134 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 self.assertEqual(record, [1, 2, 3])
2136
2137 def test_error_through_destructor(self):
2138 # Test that the exception state is not modified by a destructor,
2139 # even if close() fails.
2140 rawio = self.CloseFailureIO()
2141 def f():
2142 self.TextIOWrapper(rawio).xyzzy
2143 with support.captured_output("stderr") as s:
2144 self.assertRaises(AttributeError, f)
2145 s = s.getvalue().strip()
2146 if s:
2147 # The destructor *may* have printed an unraisable error, check it
2148 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002149 self.assertTrue(s.startswith("Exception IOError: "), s)
2150 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002151
Guido van Rossum9b76da62007-04-11 01:09:03 +00002152 # Systematic tests of the text I/O API
2153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002155 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002156 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002158 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002159 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002160 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002162 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(f.tell(), 0)
2164 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002165 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(f.seek(0), 0)
2167 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002168 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(f.read(2), "ab")
2170 self.assertEqual(f.read(1), "c")
2171 self.assertEqual(f.read(1), "")
2172 self.assertEqual(f.read(), "")
2173 self.assertEqual(f.tell(), cookie)
2174 self.assertEqual(f.seek(0), 0)
2175 self.assertEqual(f.seek(0, 2), cookie)
2176 self.assertEqual(f.write("def"), 3)
2177 self.assertEqual(f.seek(cookie), cookie)
2178 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002179 if enc.startswith("utf"):
2180 self.multi_line_test(f, enc)
2181 f.close()
2182
2183 def multi_line_test(self, f, enc):
2184 f.seek(0)
2185 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002186 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002187 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002188 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002189 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002190 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002191 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002192 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002193 wlines.append((f.tell(), line))
2194 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002195 f.seek(0)
2196 rlines = []
2197 while True:
2198 pos = f.tell()
2199 line = f.readline()
2200 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002201 break
2202 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002203 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002206 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002207 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002208 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002209 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002210 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002211 p2 = f.tell()
2212 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(f.tell(), p0)
2214 self.assertEqual(f.readline(), "\xff\n")
2215 self.assertEqual(f.tell(), p1)
2216 self.assertEqual(f.readline(), "\xff\n")
2217 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002218 f.seek(0)
2219 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002221 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002222 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002223 f.close()
2224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 def test_seeking(self):
2226 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002227 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002228 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002229 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002231 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002232 suffix = bytes(u_suffix.encode("utf-8"))
2233 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002234 with self.open(support.TESTFN, "wb") as f:
2235 f.write(line*2)
2236 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2237 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(s, str(prefix, "ascii"))
2239 self.assertEqual(f.tell(), prefix_size)
2240 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002242 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002243 # Regression test for a specific bug
2244 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002245 with self.open(support.TESTFN, "wb") as f:
2246 f.write(data)
2247 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2248 f._CHUNK_SIZE # Just test that it exists
2249 f._CHUNK_SIZE = 2
2250 f.readline()
2251 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 def test_seek_and_tell(self):
2254 #Test seek/tell using the StatefulIncrementalDecoder.
2255 # Make test faster by doing smaller seeks
2256 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002257
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002258 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002259 """Tell/seek to various points within a data stream and ensure
2260 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002262 f.write(data)
2263 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002264 f = self.open(support.TESTFN, encoding='test_decoder')
2265 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002266 decoded = f.read()
2267 f.close()
2268
Neal Norwitze2b07052008-03-18 19:52:05 +00002269 for i in range(min_pos, len(decoded) + 1): # seek positions
2270 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002272 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002273 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002274 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002275 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002276 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002277 f.close()
2278
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002279 # Enable the test decoder.
2280 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002281
2282 # Run the tests.
2283 try:
2284 # Try each test case.
2285 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002286 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002287
2288 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002289 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2290 offset = CHUNK_SIZE - len(input)//2
2291 prefix = b'.'*offset
2292 # Don't bother seeking into the prefix (takes too long).
2293 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002294 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002295
2296 # Ensure our test decoder won't interfere with subsequent tests.
2297 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002298 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002299
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002300 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002301 data = "1234567890"
2302 tests = ("utf-16",
2303 "utf-16-le",
2304 "utf-16-be",
2305 "utf-32",
2306 "utf-32-le",
2307 "utf-32-be")
2308 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002309 buf = self.BytesIO()
2310 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002311 # Check if the BOM is written only once (see issue1753).
2312 f.write(data)
2313 f.write(data)
2314 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002315 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002316 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002317 self.assertEqual(f.read(), data * 2)
2318 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002319
Benjamin Petersona1b49012009-03-31 23:11:32 +00002320 def test_unreadable(self):
2321 class UnReadable(self.BytesIO):
2322 def readable(self):
2323 return False
2324 txt = self.TextIOWrapper(UnReadable())
2325 self.assertRaises(IOError, txt.read)
2326
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002327 def test_read_one_by_one(self):
2328 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002329 reads = ""
2330 while True:
2331 c = txt.read(1)
2332 if not c:
2333 break
2334 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002335 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002336
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002337 def test_readlines(self):
2338 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2339 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2340 txt.seek(0)
2341 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2342 txt.seek(0)
2343 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2344
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002345 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002346 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002347 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002348 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002349 reads = ""
2350 while True:
2351 c = txt.read(128)
2352 if not c:
2353 break
2354 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002355 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002356
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002357 def test_writelines(self):
2358 l = ['ab', 'cd', 'ef']
2359 buf = self.BytesIO()
2360 txt = self.TextIOWrapper(buf)
2361 txt.writelines(l)
2362 txt.flush()
2363 self.assertEqual(buf.getvalue(), b'abcdef')
2364
2365 def test_writelines_userlist(self):
2366 l = UserList(['ab', 'cd', 'ef'])
2367 buf = self.BytesIO()
2368 txt = self.TextIOWrapper(buf)
2369 txt.writelines(l)
2370 txt.flush()
2371 self.assertEqual(buf.getvalue(), b'abcdef')
2372
2373 def test_writelines_error(self):
2374 txt = self.TextIOWrapper(self.BytesIO())
2375 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2376 self.assertRaises(TypeError, txt.writelines, None)
2377 self.assertRaises(TypeError, txt.writelines, b'abc')
2378
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002379 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002380 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002381
2382 # read one char at a time
2383 reads = ""
2384 while True:
2385 c = txt.read(1)
2386 if not c:
2387 break
2388 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002390
2391 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002393 txt._CHUNK_SIZE = 4
2394
2395 reads = ""
2396 while True:
2397 c = txt.read(4)
2398 if not c:
2399 break
2400 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002401 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002402
2403 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002404 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002405 txt._CHUNK_SIZE = 4
2406
2407 reads = txt.read(4)
2408 reads += txt.read(4)
2409 reads += txt.readline()
2410 reads += txt.readline()
2411 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002412 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002413
2414 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002416 txt._CHUNK_SIZE = 4
2417
2418 reads = txt.read(4)
2419 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002420 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002421
2422 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002424 txt._CHUNK_SIZE = 4
2425
2426 reads = txt.read(4)
2427 pos = txt.tell()
2428 txt.seek(0)
2429 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002430 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002431
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002432 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433 buffer = self.BytesIO(self.testdata)
2434 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002435
2436 self.assertEqual(buffer.seekable(), txt.seekable())
2437
Antoine Pitroue4501852009-05-14 18:55:55 +00002438 def test_append_bom(self):
2439 # The BOM is not written again when appending to a non-empty file
2440 filename = support.TESTFN
2441 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2442 with self.open(filename, 'w', encoding=charset) as f:
2443 f.write('aaa')
2444 pos = f.tell()
2445 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002446 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002447
2448 with self.open(filename, 'a', encoding=charset) as f:
2449 f.write('xxx')
2450 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002451 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002452
2453 def test_seek_bom(self):
2454 # Same test, but when seeking manually
2455 filename = support.TESTFN
2456 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2457 with self.open(filename, 'w', encoding=charset) as f:
2458 f.write('aaa')
2459 pos = f.tell()
2460 with self.open(filename, 'r+', encoding=charset) as f:
2461 f.seek(pos)
2462 f.write('zzz')
2463 f.seek(0)
2464 f.write('bbb')
2465 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002466 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002467
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002468 def test_errors_property(self):
2469 with self.open(support.TESTFN, "w") as f:
2470 self.assertEqual(f.errors, "strict")
2471 with self.open(support.TESTFN, "w", errors="replace") as f:
2472 self.assertEqual(f.errors, "replace")
2473
Brett Cannon31f59292011-02-21 19:29:56 +00002474 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002475 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002476 def test_threads_write(self):
2477 # Issue6750: concurrent writes could duplicate data
2478 event = threading.Event()
2479 with self.open(support.TESTFN, "w", buffering=1) as f:
2480 def run(n):
2481 text = "Thread%03d\n" % n
2482 event.wait()
2483 f.write(text)
2484 threads = [threading.Thread(target=lambda n=x: run(n))
2485 for x in range(20)]
2486 for t in threads:
2487 t.start()
2488 time.sleep(0.02)
2489 event.set()
2490 for t in threads:
2491 t.join()
2492 with self.open(support.TESTFN) as f:
2493 content = f.read()
2494 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002495 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002496
Antoine Pitrou6be88762010-05-03 16:48:20 +00002497 def test_flush_error_on_close(self):
2498 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2499 def bad_flush():
2500 raise IOError()
2501 txt.flush = bad_flush
2502 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002503 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002504
2505 def test_multi_close(self):
2506 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2507 txt.close()
2508 txt.close()
2509 txt.close()
2510 self.assertRaises(ValueError, txt.flush)
2511
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002512 def test_unseekable(self):
2513 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2514 self.assertRaises(self.UnsupportedOperation, txt.tell)
2515 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2516
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002517 def test_readonly_attributes(self):
2518 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2519 buf = self.BytesIO(self.testdata)
2520 with self.assertRaises(AttributeError):
2521 txt.buffer = buf
2522
Antoine Pitroue96ec682011-07-23 21:46:35 +02002523 def test_rawio(self):
2524 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2525 # that subprocess.Popen() can have the required unbuffered
2526 # semantics with universal_newlines=True.
2527 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2528 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2529 # Reads
2530 self.assertEqual(txt.read(4), 'abcd')
2531 self.assertEqual(txt.readline(), 'efghi\n')
2532 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2533
2534 def test_rawio_write_through(self):
2535 # Issue #12591: with write_through=True, writes don't need a flush
2536 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2537 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2538 write_through=True)
2539 txt.write('1')
2540 txt.write('23\n4')
2541 txt.write('5')
2542 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002544class CTextIOWrapperTest(TextIOWrapperTest):
2545
2546 def test_initialization(self):
2547 r = self.BytesIO(b"\xc3\xa9\n\n")
2548 b = self.BufferedReader(r, 1000)
2549 t = self.TextIOWrapper(b)
2550 self.assertRaises(TypeError, t.__init__, b, newline=42)
2551 self.assertRaises(ValueError, t.read)
2552 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2553 self.assertRaises(ValueError, t.read)
2554
2555 def test_garbage_collection(self):
2556 # C TextIOWrapper objects are collected, and collecting them flushes
2557 # all data to disk.
2558 # The Python version has __del__, so it ends in gc.garbage instead.
2559 rawio = io.FileIO(support.TESTFN, "wb")
2560 b = self.BufferedWriter(rawio)
2561 t = self.TextIOWrapper(b, encoding="ascii")
2562 t.write("456def")
2563 t.x = t
2564 wr = weakref.ref(t)
2565 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002566 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002567 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002568 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 self.assertEqual(f.read(), b"456def")
2570
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002571 def test_rwpair_cleared_before_textio(self):
2572 # Issue 13070: TextIOWrapper's finalization would crash when called
2573 # after the reference to the underlying BufferedRWPair's writer got
2574 # cleared by the GC.
2575 for i in range(1000):
2576 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2577 t1 = self.TextIOWrapper(b1, encoding="ascii")
2578 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2579 t2 = self.TextIOWrapper(b2, encoding="ascii")
2580 # circular references
2581 t1.buddy = t2
2582 t2.buddy = t1
2583 support.gc_collect()
2584
2585
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002586class PyTextIOWrapperTest(TextIOWrapperTest):
2587 pass
2588
2589
2590class IncrementalNewlineDecoderTest(unittest.TestCase):
2591
2592 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002593 # UTF-8 specific tests for a newline decoder
2594 def _check_decode(b, s, **kwargs):
2595 # We exercise getstate() / setstate() as well as decode()
2596 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002597 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002598 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002599 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002600
Antoine Pitrou180a3362008-12-14 16:36:46 +00002601 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002602
Antoine Pitrou180a3362008-12-14 16:36:46 +00002603 _check_decode(b'\xe8', "")
2604 _check_decode(b'\xa2', "")
2605 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002606
Antoine Pitrou180a3362008-12-14 16:36:46 +00002607 _check_decode(b'\xe8', "")
2608 _check_decode(b'\xa2', "")
2609 _check_decode(b'\x88', "\u8888")
2610
2611 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002612 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2613
Antoine Pitrou180a3362008-12-14 16:36:46 +00002614 decoder.reset()
2615 _check_decode(b'\n', "\n")
2616 _check_decode(b'\r', "")
2617 _check_decode(b'', "\n", final=True)
2618 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002619
Antoine Pitrou180a3362008-12-14 16:36:46 +00002620 _check_decode(b'\r', "")
2621 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002622
Antoine Pitrou180a3362008-12-14 16:36:46 +00002623 _check_decode(b'\r\r\n', "\n\n")
2624 _check_decode(b'\r', "")
2625 _check_decode(b'\r', "\n")
2626 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002627
Antoine Pitrou180a3362008-12-14 16:36:46 +00002628 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2629 _check_decode(b'\xe8\xa2\x88', "\u8888")
2630 _check_decode(b'\n', "\n")
2631 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2632 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002634 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002635 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636 if encoding is not None:
2637 encoder = codecs.getincrementalencoder(encoding)()
2638 def _decode_bytewise(s):
2639 # Decode one byte at a time
2640 for b in encoder.encode(s):
2641 result.append(decoder.decode(bytes([b])))
2642 else:
2643 encoder = None
2644 def _decode_bytewise(s):
2645 # Decode one char at a time
2646 for c in s:
2647 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002649 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002651 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002652 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002653 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002654 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002655 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002656 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002657 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002658 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002659 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002660 input = "abc"
2661 if encoder is not None:
2662 encoder.reset()
2663 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002664 self.assertEqual(decoder.decode(input), "abc")
2665 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002666
2667 def test_newline_decoder(self):
2668 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669 # None meaning the IncrementalNewlineDecoder takes unicode input
2670 # rather than bytes input
2671 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002672 'utf-16', 'utf-16-le', 'utf-16-be',
2673 'utf-32', 'utf-32-le', 'utf-32-be',
2674 )
2675 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002676 decoder = enc and codecs.getincrementaldecoder(enc)()
2677 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2678 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002679 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2681 self.check_newline_decoding_utf8(decoder)
2682
Antoine Pitrou66913e22009-03-06 23:40:56 +00002683 def test_newline_bytes(self):
2684 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2685 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002686 self.assertEqual(dec.newlines, None)
2687 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2688 self.assertEqual(dec.newlines, None)
2689 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2690 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002691 dec = self.IncrementalNewlineDecoder(None, translate=False)
2692 _check(dec)
2693 dec = self.IncrementalNewlineDecoder(None, translate=True)
2694 _check(dec)
2695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002696class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2697 pass
2698
2699class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2700 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002701
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002702
Guido van Rossum01a27522007-03-07 01:00:12 +00002703# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002704
Guido van Rossum5abbf752007-08-27 17:39:33 +00002705class MiscIOTest(unittest.TestCase):
2706
Barry Warsaw40e82462008-11-20 20:14:50 +00002707 def tearDown(self):
2708 support.unlink(support.TESTFN)
2709
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002710 def test___all__(self):
2711 for name in self.io.__all__:
2712 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002713 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002714 if name == "open":
2715 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002716 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002717 self.assertTrue(issubclass(obj, Exception), name)
2718 elif not name.startswith("SEEK_"):
2719 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002720
Barry Warsaw40e82462008-11-20 20:14:50 +00002721 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002723 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002724 f.close()
2725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002726 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(f.name, support.TESTFN)
2728 self.assertEqual(f.buffer.name, support.TESTFN)
2729 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2730 self.assertEqual(f.mode, "U")
2731 self.assertEqual(f.buffer.mode, "rb")
2732 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002733 f.close()
2734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002735 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002736 self.assertEqual(f.mode, "w+")
2737 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2738 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002741 self.assertEqual(g.mode, "wb")
2742 self.assertEqual(g.raw.mode, "wb")
2743 self.assertEqual(g.name, f.fileno())
2744 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002745 f.close()
2746 g.close()
2747
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002748 def test_io_after_close(self):
2749 for kwargs in [
2750 {"mode": "w"},
2751 {"mode": "wb"},
2752 {"mode": "w", "buffering": 1},
2753 {"mode": "w", "buffering": 2},
2754 {"mode": "wb", "buffering": 0},
2755 {"mode": "r"},
2756 {"mode": "rb"},
2757 {"mode": "r", "buffering": 1},
2758 {"mode": "r", "buffering": 2},
2759 {"mode": "rb", "buffering": 0},
2760 {"mode": "w+"},
2761 {"mode": "w+b"},
2762 {"mode": "w+", "buffering": 1},
2763 {"mode": "w+", "buffering": 2},
2764 {"mode": "w+b", "buffering": 0},
2765 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002767 f.close()
2768 self.assertRaises(ValueError, f.flush)
2769 self.assertRaises(ValueError, f.fileno)
2770 self.assertRaises(ValueError, f.isatty)
2771 self.assertRaises(ValueError, f.__iter__)
2772 if hasattr(f, "peek"):
2773 self.assertRaises(ValueError, f.peek, 1)
2774 self.assertRaises(ValueError, f.read)
2775 if hasattr(f, "read1"):
2776 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002777 if hasattr(f, "readall"):
2778 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002779 if hasattr(f, "readinto"):
2780 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2781 self.assertRaises(ValueError, f.readline)
2782 self.assertRaises(ValueError, f.readlines)
2783 self.assertRaises(ValueError, f.seek, 0)
2784 self.assertRaises(ValueError, f.tell)
2785 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 self.assertRaises(ValueError, f.write,
2787 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002788 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002789 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002791 def test_blockingioerror(self):
2792 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 class C(str):
2794 pass
2795 c = C("")
2796 b = self.BlockingIOError(1, c)
2797 c.b = b
2798 b.c = c
2799 wr = weakref.ref(c)
2800 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002801 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002802 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002803
2804 def test_abcs(self):
2805 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002806 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2807 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2808 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2809 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810
2811 def _check_abc_inheritance(self, abcmodule):
2812 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002813 self.assertIsInstance(f, abcmodule.IOBase)
2814 self.assertIsInstance(f, abcmodule.RawIOBase)
2815 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2816 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002818 self.assertIsInstance(f, abcmodule.IOBase)
2819 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2820 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2821 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002822 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002823 self.assertIsInstance(f, abcmodule.IOBase)
2824 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2825 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2826 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002827
2828 def test_abc_inheritance(self):
2829 # Test implementations inherit from their respective ABCs
2830 self._check_abc_inheritance(self)
2831
2832 def test_abc_inheritance_official(self):
2833 # Test implementations inherit from the official ABCs of the
2834 # baseline "io" module.
2835 self._check_abc_inheritance(io)
2836
Antoine Pitroue033e062010-10-29 10:38:18 +00002837 def _check_warn_on_dealloc(self, *args, **kwargs):
2838 f = open(*args, **kwargs)
2839 r = repr(f)
2840 with self.assertWarns(ResourceWarning) as cm:
2841 f = None
2842 support.gc_collect()
2843 self.assertIn(r, str(cm.warning.args[0]))
2844
2845 def test_warn_on_dealloc(self):
2846 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2847 self._check_warn_on_dealloc(support.TESTFN, "wb")
2848 self._check_warn_on_dealloc(support.TESTFN, "w")
2849
2850 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2851 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002852 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002853 for fd in fds:
2854 try:
2855 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02002856 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00002857 if e.errno != errno.EBADF:
2858 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002859 self.addCleanup(cleanup_fds)
2860 r, w = os.pipe()
2861 fds += r, w
2862 self._check_warn_on_dealloc(r, *args, **kwargs)
2863 # When using closefd=False, there's no warning
2864 r, w = os.pipe()
2865 fds += r, w
2866 with warnings.catch_warnings(record=True) as recorded:
2867 open(r, *args, closefd=False, **kwargs)
2868 support.gc_collect()
2869 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002870
2871 def test_warn_on_dealloc_fd(self):
2872 self._check_warn_on_dealloc_fd("rb", buffering=0)
2873 self._check_warn_on_dealloc_fd("rb")
2874 self._check_warn_on_dealloc_fd("r")
2875
2876
Antoine Pitrou243757e2010-11-05 21:15:39 +00002877 def test_pickling(self):
2878 # Pickling file objects is forbidden
2879 for kwargs in [
2880 {"mode": "w"},
2881 {"mode": "wb"},
2882 {"mode": "wb", "buffering": 0},
2883 {"mode": "r"},
2884 {"mode": "rb"},
2885 {"mode": "rb", "buffering": 0},
2886 {"mode": "w+"},
2887 {"mode": "w+b"},
2888 {"mode": "w+b", "buffering": 0},
2889 ]:
2890 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2891 with self.open(support.TESTFN, **kwargs) as f:
2892 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2893
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002894 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2895 def test_nonblock_pipe_write_bigbuf(self):
2896 self._test_nonblock_pipe_write(16*1024)
2897
2898 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2899 def test_nonblock_pipe_write_smallbuf(self):
2900 self._test_nonblock_pipe_write(1024)
2901
2902 def _set_non_blocking(self, fd):
2903 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2904 self.assertNotEqual(flags, -1)
2905 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2906 self.assertEqual(res, 0)
2907
2908 def _test_nonblock_pipe_write(self, bufsize):
2909 sent = []
2910 received = []
2911 r, w = os.pipe()
2912 self._set_non_blocking(r)
2913 self._set_non_blocking(w)
2914
2915 # To exercise all code paths in the C implementation we need
2916 # to play with buffer sizes. For instance, if we choose a
2917 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2918 # then we will never get a partial write of the buffer.
2919 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2920 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2921
2922 with rf, wf:
2923 for N in 9999, 73, 7574:
2924 try:
2925 i = 0
2926 while True:
2927 msg = bytes([i % 26 + 97]) * N
2928 sent.append(msg)
2929 wf.write(msg)
2930 i += 1
2931
2932 except self.BlockingIOError as e:
2933 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002934 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002935 sent[-1] = sent[-1][:e.characters_written]
2936 received.append(rf.read())
2937 msg = b'BLOCKED'
2938 wf.write(msg)
2939 sent.append(msg)
2940
2941 while True:
2942 try:
2943 wf.flush()
2944 break
2945 except self.BlockingIOError as e:
2946 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002947 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002948 self.assertEqual(e.characters_written, 0)
2949 received.append(rf.read())
2950
2951 received += iter(rf.read, None)
2952
2953 sent, received = b''.join(sent), b''.join(received)
2954 self.assertTrue(sent == received)
2955 self.assertTrue(wf.closed)
2956 self.assertTrue(rf.closed)
2957
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002958 def test_create_fail(self):
2959 # 'x' mode fails if file is existing
2960 with self.open(support.TESTFN, 'w'):
2961 pass
2962 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2963
2964 def test_create_writes(self):
2965 # 'x' mode opens for writing
2966 with self.open(support.TESTFN, 'xb') as f:
2967 f.write(b"spam")
2968 with self.open(support.TESTFN, 'rb') as f:
2969 self.assertEqual(b"spam", f.read())
2970
Christian Heimes7b648752012-09-10 14:48:43 +02002971 def test_open_allargs(self):
2972 # there used to be a buffer overflow in the parser for rawmode
2973 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2974
2975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002976class CMiscIOTest(MiscIOTest):
2977 io = io
2978
2979class PyMiscIOTest(MiscIOTest):
2980 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002981
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002982
2983@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2984class SignalsTest(unittest.TestCase):
2985
2986 def setUp(self):
2987 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2988
2989 def tearDown(self):
2990 signal.signal(signal.SIGALRM, self.oldalrm)
2991
2992 def alarm_interrupt(self, sig, frame):
2993 1/0
2994
2995 @unittest.skipUnless(threading, 'Threading required for this test.')
2996 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2997 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002998 invokes the signal handler, and bubbles up the exception raised
2999 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003000 read_results = []
3001 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02003002 if hasattr(signal, 'pthread_sigmask'):
3003 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003004 s = os.read(r, 1)
3005 read_results.append(s)
3006 t = threading.Thread(target=_read)
3007 t.daemon = True
3008 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003009 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003010 try:
3011 wio = self.io.open(w, **fdopen_kwargs)
3012 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003013 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003014 # Fill the pipe enough that the write will be blocking.
3015 # It will be interrupted by the timer armed above. Since the
3016 # other thread has read one byte, the low-level write will
3017 # return with a successful (partial) result rather than an EINTR.
3018 # The buffered IO layer must check for pending signal
3019 # handlers, which in this case will invoke alarm_interrupt().
3020 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02003021 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003022 t.join()
3023 # We got one byte, get another one and check that it isn't a
3024 # repeat of the first one.
3025 read_results.append(os.read(r, 1))
3026 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3027 finally:
3028 os.close(w)
3029 os.close(r)
3030 # This is deliberate. If we didn't close the file descriptor
3031 # before closing wio, wio would try to flush its internal
3032 # buffer, and block again.
3033 try:
3034 wio.close()
3035 except IOError as e:
3036 if e.errno != errno.EBADF:
3037 raise
3038
3039 def test_interrupted_write_unbuffered(self):
3040 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3041
3042 def test_interrupted_write_buffered(self):
3043 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3044
3045 def test_interrupted_write_text(self):
3046 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3047
Brett Cannon31f59292011-02-21 19:29:56 +00003048 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003049 def check_reentrant_write(self, data, **fdopen_kwargs):
3050 def on_alarm(*args):
3051 # Will be called reentrantly from the same thread
3052 wio.write(data)
3053 1/0
3054 signal.signal(signal.SIGALRM, on_alarm)
3055 r, w = os.pipe()
3056 wio = self.io.open(w, **fdopen_kwargs)
3057 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003058 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003059 # Either the reentrant call to wio.write() fails with RuntimeError,
3060 # or the signal handler raises ZeroDivisionError.
3061 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3062 while 1:
3063 for i in range(100):
3064 wio.write(data)
3065 wio.flush()
3066 # Make sure the buffer doesn't fill up and block further writes
3067 os.read(r, len(data) * 100)
3068 exc = cm.exception
3069 if isinstance(exc, RuntimeError):
3070 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3071 finally:
3072 wio.close()
3073 os.close(r)
3074
3075 def test_reentrant_write_buffered(self):
3076 self.check_reentrant_write(b"xy", mode="wb")
3077
3078 def test_reentrant_write_text(self):
3079 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3080
Antoine Pitrou707ce822011-02-25 21:24:11 +00003081 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3082 """Check that a buffered read, when it gets interrupted (either
3083 returning a partial result or EINTR), properly invokes the signal
3084 handler and retries if the latter returned successfully."""
3085 r, w = os.pipe()
3086 fdopen_kwargs["closefd"] = False
3087 def alarm_handler(sig, frame):
3088 os.write(w, b"bar")
3089 signal.signal(signal.SIGALRM, alarm_handler)
3090 try:
3091 rio = self.io.open(r, **fdopen_kwargs)
3092 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003093 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003094 # Expected behaviour:
3095 # - first raw read() returns partial b"foo"
3096 # - second raw read() returns EINTR
3097 # - third raw read() returns b"bar"
3098 self.assertEqual(decode(rio.read(6)), "foobar")
3099 finally:
3100 rio.close()
3101 os.close(w)
3102 os.close(r)
3103
Antoine Pitrou20db5112011-08-19 20:32:34 +02003104 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003105 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3106 mode="rb")
3107
Antoine Pitrou20db5112011-08-19 20:32:34 +02003108 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003109 self.check_interrupted_read_retry(lambda x: x,
3110 mode="r")
3111
3112 @unittest.skipUnless(threading, 'Threading required for this test.')
3113 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3114 """Check that a buffered write, when it gets interrupted (either
3115 returning a partial result or EINTR), properly invokes the signal
3116 handler and retries if the latter returned successfully."""
3117 select = support.import_module("select")
3118 # A quantity that exceeds the buffer size of an anonymous pipe's
3119 # write end.
3120 N = 1024 * 1024
3121 r, w = os.pipe()
3122 fdopen_kwargs["closefd"] = False
3123 # We need a separate thread to read from the pipe and allow the
3124 # write() to finish. This thread is started after the SIGALRM is
3125 # received (forcing a first EINTR in write()).
3126 read_results = []
3127 write_finished = False
3128 def _read():
3129 while not write_finished:
3130 while r in select.select([r], [], [], 1.0)[0]:
3131 s = os.read(r, 1024)
3132 read_results.append(s)
3133 t = threading.Thread(target=_read)
3134 t.daemon = True
3135 def alarm1(sig, frame):
3136 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003137 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003138 def alarm2(sig, frame):
3139 t.start()
3140 signal.signal(signal.SIGALRM, alarm1)
3141 try:
3142 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003143 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003144 # Expected behaviour:
3145 # - first raw write() is partial (because of the limited pipe buffer
3146 # and the first alarm)
3147 # - second raw write() returns EINTR (because of the second alarm)
3148 # - subsequent write()s are successful (either partial or complete)
3149 self.assertEqual(N, wio.write(item * N))
3150 wio.flush()
3151 write_finished = True
3152 t.join()
3153 self.assertEqual(N, sum(len(x) for x in read_results))
3154 finally:
3155 write_finished = True
3156 os.close(w)
3157 os.close(r)
3158 # This is deliberate. If we didn't close the file descriptor
3159 # before closing wio, wio would try to flush its internal
3160 # buffer, and could block (in case of failure).
3161 try:
3162 wio.close()
3163 except IOError as e:
3164 if e.errno != errno.EBADF:
3165 raise
3166
Antoine Pitrou20db5112011-08-19 20:32:34 +02003167 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003168 self.check_interrupted_write_retry(b"x", mode="wb")
3169
Antoine Pitrou20db5112011-08-19 20:32:34 +02003170 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003171 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3172
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003173
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003174class CSignalsTest(SignalsTest):
3175 io = io
3176
3177class PySignalsTest(SignalsTest):
3178 io = pyio
3179
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003180 # Handling reentrancy issues would slow down _pyio even more, so the
3181 # tests are disabled.
3182 test_reentrant_write_buffered = None
3183 test_reentrant_write_text = None
3184
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003185
Guido van Rossum28524c72007-02-27 05:47:44 +00003186def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003187 tests = (CIOTest, PyIOTest,
3188 CBufferedReaderTest, PyBufferedReaderTest,
3189 CBufferedWriterTest, PyBufferedWriterTest,
3190 CBufferedRWPairTest, PyBufferedRWPairTest,
3191 CBufferedRandomTest, PyBufferedRandomTest,
3192 StatefulIncrementalDecoderTest,
3193 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3194 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003195 CMiscIOTest, PyMiscIOTest,
3196 CSignalsTest, PySignalsTest,
3197 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003198
3199 # Put the namespaces of the IO module we are testing and some useful mock
3200 # classes in the __dict__ of each test.
3201 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003202 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003203 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3204 c_io_ns = {name : getattr(io, name) for name in all_members}
3205 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3206 globs = globals()
3207 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3208 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3209 # Avoid turning open into a bound method.
3210 py_io_ns["open"] = pyio.OpenWrapper
3211 for test in tests:
3212 if test.__name__.startswith("C"):
3213 for name, obj in c_io_ns.items():
3214 setattr(test, name, obj)
3215 elif test.__name__.startswith("Py"):
3216 for name, obj in py_io_ns.items():
3217 setattr(test, name, obj)
3218
3219 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003220
3221if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003222 test_main()