blob: 6931e0a5a7dc91ab5b4ab3ea61cbc80c212e926a [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020035from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000053 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600606 self.assertTrue(f.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000607
608 def test_multi_close(self):
609 f = self.open(support.TESTFN, "wb", buffering=0)
610 f.close()
611 f.close()
612 f.close()
613 self.assertRaises(ValueError, f.flush)
614
Antoine Pitrou328ec742010-09-14 18:37:24 +0000615 def test_RawIOBase_read(self):
616 # Exercise the default RawIOBase.read() implementation (which calls
617 # readinto() internally).
618 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
619 self.assertEqual(rawio.read(2), b"ab")
620 self.assertEqual(rawio.read(2), b"c")
621 self.assertEqual(rawio.read(2), b"d")
622 self.assertEqual(rawio.read(2), None)
623 self.assertEqual(rawio.read(2), b"ef")
624 self.assertEqual(rawio.read(2), b"g")
625 self.assertEqual(rawio.read(2), None)
626 self.assertEqual(rawio.read(2), b"")
627
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400628 def test_types_have_dict(self):
629 test = (
630 self.IOBase(),
631 self.RawIOBase(),
632 self.TextIOBase(),
633 self.StringIO(),
634 self.BytesIO()
635 )
636 for obj in test:
637 self.assertTrue(hasattr(obj, "__dict__"))
638
Ross Lagerwall59142db2011-10-31 20:34:46 +0200639 def test_opener(self):
640 with self.open(support.TESTFN, "w") as f:
641 f.write("egg\n")
642 fd = os.open(support.TESTFN, os.O_RDONLY)
643 def opener(path, flags):
644 return fd
645 with self.open("non-existent", "r", opener=opener) as f:
646 self.assertEqual(f.read(), "egg\n")
647
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200648 def test_fileio_closefd(self):
649 # Issue #4841
650 with self.open(__file__, 'rb') as f1, \
651 self.open(__file__, 'rb') as f2:
652 fileio = self.FileIO(f1.fileno(), closefd=False)
653 # .__init__() must not close f1
654 fileio.__init__(f2.fileno(), closefd=False)
655 f1.readline()
656 # .close() must not close f2
657 fileio.close()
658 f2.readline()
659
660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200662
663 def test_IOBase_finalize(self):
664 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
665 # class which inherits IOBase and an object of this class are caught
666 # in a reference cycle and close() is already in the method cache.
667 class MyIO(self.IOBase):
668 def close(self):
669 pass
670
671 # create an instance to populate the method cache
672 MyIO()
673 obj = MyIO()
674 obj.obj = obj
675 wr = weakref.ref(obj)
676 del MyIO
677 del obj
678 support.gc_collect()
679 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000681class PyIOTest(IOTest):
682 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000683
Guido van Rossuma9e20242007-03-08 00:43:48 +0000684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000685class CommonBufferedTests:
686 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
687
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000688 def test_detach(self):
689 raw = self.MockRawIO()
690 buf = self.tp(raw)
691 self.assertIs(buf.detach(), raw)
692 self.assertRaises(ValueError, buf.detach)
693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_fileno(self):
695 rawio = self.MockRawIO()
696 bufio = self.tp(rawio)
697
Ezio Melottib3aedd42010-11-20 19:04:17 +0000698 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699
700 def test_no_fileno(self):
701 # XXX will we always have fileno() function? If so, kill
702 # this test. Else, write it.
703 pass
704
705 def test_invalid_args(self):
706 rawio = self.MockRawIO()
707 bufio = self.tp(rawio)
708 # Invalid whence
709 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200710 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711
712 def test_override_destructor(self):
713 tp = self.tp
714 record = []
715 class MyBufferedIO(tp):
716 def __del__(self):
717 record.append(1)
718 try:
719 f = super().__del__
720 except AttributeError:
721 pass
722 else:
723 f()
724 def close(self):
725 record.append(2)
726 super().close()
727 def flush(self):
728 record.append(3)
729 super().flush()
730 rawio = self.MockRawIO()
731 bufio = MyBufferedIO(rawio)
732 writable = bufio.writable()
733 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000734 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 if writable:
736 self.assertEqual(record, [1, 2, 3])
737 else:
738 self.assertEqual(record, [1, 2])
739
740 def test_context_manager(self):
741 # Test usability as a context manager
742 rawio = self.MockRawIO()
743 bufio = self.tp(rawio)
744 def _with():
745 with bufio:
746 pass
747 _with()
748 # bufio should now be closed, and using it a second time should raise
749 # a ValueError.
750 self.assertRaises(ValueError, _with)
751
752 def test_error_through_destructor(self):
753 # Test that the exception state is not modified by a destructor,
754 # even if close() fails.
755 rawio = self.CloseFailureIO()
756 def f():
757 self.tp(rawio).xyzzy
758 with support.captured_output("stderr") as s:
759 self.assertRaises(AttributeError, f)
760 s = s.getvalue().strip()
761 if s:
762 # The destructor *may* have printed an unraisable error, check it
763 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000764 self.assertTrue(s.startswith("Exception IOError: "), s)
765 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000766
Antoine Pitrou716c4442009-05-23 19:04:03 +0000767 def test_repr(self):
768 raw = self.MockRawIO()
769 b = self.tp(raw)
770 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
771 self.assertEqual(repr(b), "<%s>" % clsname)
772 raw.name = "dummy"
773 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
774 raw.name = b"dummy"
775 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
776
Antoine Pitrou6be88762010-05-03 16:48:20 +0000777 def test_flush_error_on_close(self):
778 raw = self.MockRawIO()
779 def bad_flush():
780 raise IOError()
781 raw.flush = bad_flush
782 b = self.tp(raw)
783 self.assertRaises(IOError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600784 self.assertTrue(b.closed)
785
786 def test_close_error_on_close(self):
787 raw = self.MockRawIO()
788 def bad_flush():
789 raise IOError('flush')
790 def bad_close():
791 raise IOError('close')
792 raw.close = bad_close
793 b = self.tp(raw)
794 b.flush = bad_flush
795 with self.assertRaises(IOError) as err: # exception not swallowed
796 b.close()
797 self.assertEqual(err.exception.args, ('close',))
798 self.assertEqual(err.exception.__context__.args, ('flush',))
799 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000800
801 def test_multi_close(self):
802 raw = self.MockRawIO()
803 b = self.tp(raw)
804 b.close()
805 b.close()
806 b.close()
807 self.assertRaises(ValueError, b.flush)
808
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000809 def test_unseekable(self):
810 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
811 self.assertRaises(self.UnsupportedOperation, bufio.tell)
812 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
813
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000814 def test_readonly_attributes(self):
815 raw = self.MockRawIO()
816 buf = self.tp(raw)
817 x = self.MockRawIO()
818 with self.assertRaises(AttributeError):
819 buf.raw = x
820
Guido van Rossum78892e42007-04-06 17:31:18 +0000821
Antoine Pitrou10f0c502012-07-29 19:02:46 +0200822class SizeofTest:
823
824 @support.cpython_only
825 def test_sizeof(self):
826 bufsize1 = 4096
827 bufsize2 = 8192
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio, buffer_size=bufsize1)
830 size = sys.getsizeof(bufio) - bufsize1
831 rawio = self.MockRawIO()
832 bufio = self.tp(rawio, buffer_size=bufsize2)
833 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
834
835
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000836class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
837 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000839 def test_constructor(self):
840 rawio = self.MockRawIO([b"abc"])
841 bufio = self.tp(rawio)
842 bufio.__init__(rawio)
843 bufio.__init__(rawio, buffer_size=1024)
844 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000845 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
848 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
849 rawio = self.MockRawIO([b"abc"])
850 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000851 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000852
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000854 for arg in (None, 7):
855 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
856 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000857 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 # Invalid args
859 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000860
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000861 def test_read1(self):
862 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
863 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000864 self.assertEqual(b"a", bufio.read(1))
865 self.assertEqual(b"b", bufio.read1(1))
866 self.assertEqual(rawio._reads, 1)
867 self.assertEqual(b"c", bufio.read1(100))
868 self.assertEqual(rawio._reads, 1)
869 self.assertEqual(b"d", bufio.read1(100))
870 self.assertEqual(rawio._reads, 2)
871 self.assertEqual(b"efg", bufio.read1(100))
872 self.assertEqual(rawio._reads, 3)
873 self.assertEqual(b"", bufio.read1(100))
874 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 # Invalid args
876 self.assertRaises(ValueError, bufio.read1, -1)
877
878 def test_readinto(self):
879 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
880 bufio = self.tp(rawio)
881 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000882 self.assertEqual(bufio.readinto(b), 2)
883 self.assertEqual(b, b"ab")
884 self.assertEqual(bufio.readinto(b), 2)
885 self.assertEqual(b, b"cd")
886 self.assertEqual(bufio.readinto(b), 2)
887 self.assertEqual(b, b"ef")
888 self.assertEqual(bufio.readinto(b), 1)
889 self.assertEqual(b, b"gf")
890 self.assertEqual(bufio.readinto(b), 0)
891 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200892 rawio = self.MockRawIO((b"abc", None))
893 bufio = self.tp(rawio)
894 self.assertEqual(bufio.readinto(b), 2)
895 self.assertEqual(b, b"ab")
896 self.assertEqual(bufio.readinto(b), 1)
897 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000899 def test_readlines(self):
900 def bufio():
901 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
902 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000903 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
904 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
905 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000907 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000908 data = b"abcdefghi"
909 dlen = len(data)
910
911 tests = [
912 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
913 [ 100, [ 3, 3, 3], [ dlen ] ],
914 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
915 ]
916
917 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 rawio = self.MockFileIO(data)
919 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000920 pos = 0
921 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000922 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000923 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000925 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000927 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000928 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
930 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000931 self.assertEqual(b"abcd", bufio.read(6))
932 self.assertEqual(b"e", bufio.read(1))
933 self.assertEqual(b"fg", bufio.read())
934 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200935 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000936 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000937
Victor Stinnera80987f2011-05-25 22:47:16 +0200938 rawio = self.MockRawIO((b"a", None, None))
939 self.assertEqual(b"a", rawio.readall())
940 self.assertIsNone(rawio.readall())
941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_read_past_eof(self):
943 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
944 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000945
Ezio Melottib3aedd42010-11-20 19:04:17 +0000946 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 def test_read_all(self):
949 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
950 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000951
Ezio Melottib3aedd42010-11-20 19:04:17 +0000952 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000953
Victor Stinner45df8202010-04-28 22:31:17 +0000954 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000955 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000956 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000957 try:
958 # Write out many bytes with exactly the same number of 0's,
959 # 1's... 255's. This will help us check that concurrent reading
960 # doesn't duplicate or forget contents.
961 N = 1000
962 l = list(range(256)) * N
963 random.shuffle(l)
964 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000965 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000966 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000967 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000968 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000969 errors = []
970 results = []
971 def f():
972 try:
973 # Intra-buffer read then buffer-flushing read
974 for n in cycle([1, 19]):
975 s = bufio.read(n)
976 if not s:
977 break
978 # list.append() is atomic
979 results.append(s)
980 except Exception as e:
981 errors.append(e)
982 raise
983 threads = [threading.Thread(target=f) for x in range(20)]
984 for t in threads:
985 t.start()
986 time.sleep(0.02) # yield
987 for t in threads:
988 t.join()
989 self.assertFalse(errors,
990 "the following exceptions were caught: %r" % errors)
991 s = b''.join(results)
992 for i in range(256):
993 c = bytes(bytearray([i]))
994 self.assertEqual(s.count(c), N)
995 finally:
996 support.unlink(support.TESTFN)
997
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200998 def test_unseekable(self):
999 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1000 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1001 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1002 bufio.read(1)
1003 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1004 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006 def test_misbehaved_io(self):
1007 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1008 bufio = self.tp(rawio)
1009 self.assertRaises(IOError, bufio.seek, 0)
1010 self.assertRaises(IOError, bufio.tell)
1011
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001012 def test_no_extraneous_read(self):
1013 # Issue #9550; when the raw IO object has satisfied the read request,
1014 # we should not issue any additional reads, otherwise it may block
1015 # (e.g. socket).
1016 bufsize = 16
1017 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1018 rawio = self.MockRawIO([b"x" * n])
1019 bufio = self.tp(rawio, bufsize)
1020 self.assertEqual(bufio.read(n), b"x" * n)
1021 # Simple case: one raw read is enough to satisfy the request.
1022 self.assertEqual(rawio._extraneous_reads, 0,
1023 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1024 # A more complex case where two raw reads are needed to satisfy
1025 # the request.
1026 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1027 bufio = self.tp(rawio, bufsize)
1028 self.assertEqual(bufio.read(n), b"x" * n)
1029 self.assertEqual(rawio._extraneous_reads, 0,
1030 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1031
1032
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001033class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 tp = io.BufferedReader
1035
1036 def test_constructor(self):
1037 BufferedReaderTest.test_constructor(self)
1038 # The allocation can succeed on 32-bit builds, e.g. with more
1039 # than 2GB RAM and a 64-bit kernel.
1040 if sys.maxsize > 0x7FFFFFFF:
1041 rawio = self.MockRawIO()
1042 bufio = self.tp(rawio)
1043 self.assertRaises((OverflowError, MemoryError, ValueError),
1044 bufio.__init__, rawio, sys.maxsize)
1045
1046 def test_initialization(self):
1047 rawio = self.MockRawIO([b"abc"])
1048 bufio = self.tp(rawio)
1049 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1050 self.assertRaises(ValueError, bufio.read)
1051 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1052 self.assertRaises(ValueError, bufio.read)
1053 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1054 self.assertRaises(ValueError, bufio.read)
1055
1056 def test_misbehaved_io_read(self):
1057 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1058 bufio = self.tp(rawio)
1059 # _pyio.BufferedReader seems to implement reading different, so that
1060 # checking this is not so easy.
1061 self.assertRaises(IOError, bufio.read, 10)
1062
1063 def test_garbage_collection(self):
1064 # C BufferedReader objects are collected.
1065 # The Python version has __del__, so it ends into gc.garbage instead
1066 rawio = self.FileIO(support.TESTFN, "w+b")
1067 f = self.tp(rawio)
1068 f.f = f
1069 wr = weakref.ref(f)
1070 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001071 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001072 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073
1074class PyBufferedReaderTest(BufferedReaderTest):
1075 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001076
Guido van Rossuma9e20242007-03-08 00:43:48 +00001077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1079 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 def test_constructor(self):
1082 rawio = self.MockRawIO()
1083 bufio = self.tp(rawio)
1084 bufio.__init__(rawio)
1085 bufio.__init__(rawio, buffer_size=1024)
1086 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 bufio.flush()
1089 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1090 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1091 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1092 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001095 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001097 def test_detach_flush(self):
1098 raw = self.MockRawIO()
1099 buf = self.tp(raw)
1100 buf.write(b"howdy!")
1101 self.assertFalse(raw._write_stack)
1102 buf.detach()
1103 self.assertEqual(raw._write_stack, [b"howdy!"])
1104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001106 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 writer = self.MockRawIO()
1108 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001109 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001110 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 def test_write_overflow(self):
1113 writer = self.MockRawIO()
1114 bufio = self.tp(writer, 8)
1115 contents = b"abcdefghijklmnop"
1116 for n in range(0, len(contents), 3):
1117 bufio.write(contents[n:n+3])
1118 flushed = b"".join(writer._write_stack)
1119 # At least (total - 8) bytes were implicitly flushed, perhaps more
1120 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001121 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 def check_writes(self, intermediate_func):
1124 # Lots of writes, test the flushed output is as expected.
1125 contents = bytes(range(256)) * 1000
1126 n = 0
1127 writer = self.MockRawIO()
1128 bufio = self.tp(writer, 13)
1129 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1130 def gen_sizes():
1131 for size in count(1):
1132 for i in range(15):
1133 yield size
1134 sizes = gen_sizes()
1135 while n < len(contents):
1136 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001137 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001138 intermediate_func(bufio)
1139 n += size
1140 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 def test_writes(self):
1144 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_writes_and_flushes(self):
1147 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001148
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001149 def test_writes_and_seeks(self):
1150 def _seekabs(bufio):
1151 pos = bufio.tell()
1152 bufio.seek(pos + 1, 0)
1153 bufio.seek(pos - 1, 0)
1154 bufio.seek(pos, 0)
1155 self.check_writes(_seekabs)
1156 def _seekrel(bufio):
1157 pos = bufio.seek(0, 1)
1158 bufio.seek(+1, 1)
1159 bufio.seek(-1, 1)
1160 bufio.seek(pos, 0)
1161 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163 def test_writes_and_truncates(self):
1164 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166 def test_write_non_blocking(self):
1167 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001168 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001169
Ezio Melottib3aedd42010-11-20 19:04:17 +00001170 self.assertEqual(bufio.write(b"abcd"), 4)
1171 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 # 1 byte will be written, the rest will be buffered
1173 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001174 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1177 raw.block_on(b"0")
1178 try:
1179 bufio.write(b"opqrwxyz0123456789")
1180 except self.BlockingIOError as e:
1181 written = e.characters_written
1182 else:
1183 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001184 self.assertEqual(written, 16)
1185 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001187
Ezio Melottib3aedd42010-11-20 19:04:17 +00001188 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189 s = raw.pop_written()
1190 # Previously buffered bytes were flushed
1191 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193 def test_write_and_rewind(self):
1194 raw = io.BytesIO()
1195 bufio = self.tp(raw, 4)
1196 self.assertEqual(bufio.write(b"abcdef"), 6)
1197 self.assertEqual(bufio.tell(), 6)
1198 bufio.seek(0, 0)
1199 self.assertEqual(bufio.write(b"XY"), 2)
1200 bufio.seek(6, 0)
1201 self.assertEqual(raw.getvalue(), b"XYcdef")
1202 self.assertEqual(bufio.write(b"123456"), 6)
1203 bufio.flush()
1204 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001205
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001206 def test_flush(self):
1207 writer = self.MockRawIO()
1208 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001209 bufio.write(b"abc")
1210 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001211 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001212
Antoine Pitrou131a4892012-10-16 22:57:11 +02001213 def test_writelines(self):
1214 l = [b'ab', b'cd', b'ef']
1215 writer = self.MockRawIO()
1216 bufio = self.tp(writer, 8)
1217 bufio.writelines(l)
1218 bufio.flush()
1219 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1220
1221 def test_writelines_userlist(self):
1222 l = UserList([b'ab', b'cd', b'ef'])
1223 writer = self.MockRawIO()
1224 bufio = self.tp(writer, 8)
1225 bufio.writelines(l)
1226 bufio.flush()
1227 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1228
1229 def test_writelines_error(self):
1230 writer = self.MockRawIO()
1231 bufio = self.tp(writer, 8)
1232 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1233 self.assertRaises(TypeError, bufio.writelines, None)
1234 self.assertRaises(TypeError, bufio.writelines, 'abc')
1235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236 def test_destructor(self):
1237 writer = self.MockRawIO()
1238 bufio = self.tp(writer, 8)
1239 bufio.write(b"abc")
1240 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001241 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001242 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243
1244 def test_truncate(self):
1245 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001246 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 bufio = self.tp(raw, 8)
1248 bufio.write(b"abcdef")
1249 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001250 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001251 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001252 self.assertEqual(f.read(), b"abc")
1253
Victor Stinner45df8202010-04-28 22:31:17 +00001254 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001255 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001256 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001257 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258 # Write out many bytes from many threads and test they were
1259 # all flushed.
1260 N = 1000
1261 contents = bytes(range(256)) * N
1262 sizes = cycle([1, 19])
1263 n = 0
1264 queue = deque()
1265 while n < len(contents):
1266 size = next(sizes)
1267 queue.append(contents[n:n+size])
1268 n += size
1269 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001270 # We use a real file object because it allows us to
1271 # exercise situations where the GIL is released before
1272 # writing the buffer to the raw streams. This is in addition
1273 # to concurrency issues due to switching threads in the middle
1274 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001275 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001277 errors = []
1278 def f():
1279 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001280 while True:
1281 try:
1282 s = queue.popleft()
1283 except IndexError:
1284 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001285 bufio.write(s)
1286 except Exception as e:
1287 errors.append(e)
1288 raise
1289 threads = [threading.Thread(target=f) for x in range(20)]
1290 for t in threads:
1291 t.start()
1292 time.sleep(0.02) # yield
1293 for t in threads:
1294 t.join()
1295 self.assertFalse(errors,
1296 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001298 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 s = f.read()
1300 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001301 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001302 finally:
1303 support.unlink(support.TESTFN)
1304
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305 def test_misbehaved_io(self):
1306 rawio = self.MisbehavedRawIO()
1307 bufio = self.tp(rawio, 5)
1308 self.assertRaises(IOError, bufio.seek, 0)
1309 self.assertRaises(IOError, bufio.tell)
1310 self.assertRaises(IOError, bufio.write, b"abcdef")
1311
Florent Xicluna109d5732012-07-07 17:03:22 +02001312 def test_max_buffer_size_removal(self):
1313 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001314 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001315
Benjamin Peterson68623612012-12-20 11:53:11 -06001316 def test_write_error_on_close(self):
1317 raw = self.MockRawIO()
1318 def bad_write(b):
1319 raise IOError()
1320 raw.write = bad_write
1321 b = self.tp(raw)
1322 b.write(b'spam')
1323 self.assertRaises(IOError, b.close) # exception not swallowed
1324 self.assertTrue(b.closed)
1325
Benjamin Peterson59406a92009-03-26 17:10:29 +00001326
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001327class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001328 tp = io.BufferedWriter
1329
1330 def test_constructor(self):
1331 BufferedWriterTest.test_constructor(self)
1332 # The allocation can succeed on 32-bit builds, e.g. with more
1333 # than 2GB RAM and a 64-bit kernel.
1334 if sys.maxsize > 0x7FFFFFFF:
1335 rawio = self.MockRawIO()
1336 bufio = self.tp(rawio)
1337 self.assertRaises((OverflowError, MemoryError, ValueError),
1338 bufio.__init__, rawio, sys.maxsize)
1339
1340 def test_initialization(self):
1341 rawio = self.MockRawIO()
1342 bufio = self.tp(rawio)
1343 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1344 self.assertRaises(ValueError, bufio.write, b"def")
1345 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1346 self.assertRaises(ValueError, bufio.write, b"def")
1347 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1348 self.assertRaises(ValueError, bufio.write, b"def")
1349
1350 def test_garbage_collection(self):
1351 # C BufferedWriter objects are collected, and collecting them flushes
1352 # all data to disk.
1353 # The Python version has __del__, so it ends into gc.garbage instead
1354 rawio = self.FileIO(support.TESTFN, "w+b")
1355 f = self.tp(rawio)
1356 f.write(b"123xxx")
1357 f.x = f
1358 wr = weakref.ref(f)
1359 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001360 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001361 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001362 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363 self.assertEqual(f.read(), b"123xxx")
1364
1365
1366class PyBufferedWriterTest(BufferedWriterTest):
1367 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001368
Guido van Rossum01a27522007-03-07 01:00:12 +00001369class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001370
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001371 def test_constructor(self):
1372 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001373 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001374
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001375 def test_detach(self):
1376 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1377 self.assertRaises(self.UnsupportedOperation, pair.detach)
1378
Florent Xicluna109d5732012-07-07 17:03:22 +02001379 def test_constructor_max_buffer_size_removal(self):
1380 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001381 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001382
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001383 def test_constructor_with_not_readable(self):
1384 class NotReadable(MockRawIO):
1385 def readable(self):
1386 return False
1387
1388 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1389
1390 def test_constructor_with_not_writeable(self):
1391 class NotWriteable(MockRawIO):
1392 def writable(self):
1393 return False
1394
1395 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1396
1397 def test_read(self):
1398 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1399
1400 self.assertEqual(pair.read(3), b"abc")
1401 self.assertEqual(pair.read(1), b"d")
1402 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001403 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1404 self.assertEqual(pair.read(None), b"abc")
1405
1406 def test_readlines(self):
1407 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1408 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1409 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1410 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001411
1412 def test_read1(self):
1413 # .read1() is delegated to the underlying reader object, so this test
1414 # can be shallow.
1415 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1416
1417 self.assertEqual(pair.read1(3), b"abc")
1418
1419 def test_readinto(self):
1420 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1421
1422 data = bytearray(5)
1423 self.assertEqual(pair.readinto(data), 5)
1424 self.assertEqual(data, b"abcde")
1425
1426 def test_write(self):
1427 w = self.MockRawIO()
1428 pair = self.tp(self.MockRawIO(), w)
1429
1430 pair.write(b"abc")
1431 pair.flush()
1432 pair.write(b"def")
1433 pair.flush()
1434 self.assertEqual(w._write_stack, [b"abc", b"def"])
1435
1436 def test_peek(self):
1437 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1438
1439 self.assertTrue(pair.peek(3).startswith(b"abc"))
1440 self.assertEqual(pair.read(3), b"abc")
1441
1442 def test_readable(self):
1443 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1444 self.assertTrue(pair.readable())
1445
1446 def test_writeable(self):
1447 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1448 self.assertTrue(pair.writable())
1449
1450 def test_seekable(self):
1451 # BufferedRWPairs are never seekable, even if their readers and writers
1452 # are.
1453 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1454 self.assertFalse(pair.seekable())
1455
1456 # .flush() is delegated to the underlying writer object and has been
1457 # tested in the test_write method.
1458
1459 def test_close_and_closed(self):
1460 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1461 self.assertFalse(pair.closed)
1462 pair.close()
1463 self.assertTrue(pair.closed)
1464
1465 def test_isatty(self):
1466 class SelectableIsAtty(MockRawIO):
1467 def __init__(self, isatty):
1468 MockRawIO.__init__(self)
1469 self._isatty = isatty
1470
1471 def isatty(self):
1472 return self._isatty
1473
1474 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1475 self.assertFalse(pair.isatty())
1476
1477 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1478 self.assertTrue(pair.isatty())
1479
1480 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1481 self.assertTrue(pair.isatty())
1482
1483 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1484 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001485
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001486class CBufferedRWPairTest(BufferedRWPairTest):
1487 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489class PyBufferedRWPairTest(BufferedRWPairTest):
1490 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492
1493class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1494 read_mode = "rb+"
1495 write_mode = "wb+"
1496
1497 def test_constructor(self):
1498 BufferedReaderTest.test_constructor(self)
1499 BufferedWriterTest.test_constructor(self)
1500
1501 def test_read_and_write(self):
1502 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001503 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001504
1505 self.assertEqual(b"as", rw.read(2))
1506 rw.write(b"ddd")
1507 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001508 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001510 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001511
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001512 def test_seek_and_tell(self):
1513 raw = self.BytesIO(b"asdfghjkl")
1514 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001515
Ezio Melottib3aedd42010-11-20 19:04:17 +00001516 self.assertEqual(b"as", rw.read(2))
1517 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001518 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001519 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001520
Antoine Pitroue05565e2011-08-20 14:39:23 +02001521 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001522 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001523 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001524 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001525 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001526 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001527 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001528 self.assertEqual(7, rw.tell())
1529 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001530 rw.flush()
1531 self.assertEqual(b"asdf123fl", raw.getvalue())
1532
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001533 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535 def check_flush_and_read(self, read_func):
1536 raw = self.BytesIO(b"abcdefghi")
1537 bufio = self.tp(raw)
1538
Ezio Melottib3aedd42010-11-20 19:04:17 +00001539 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001541 self.assertEqual(b"ef", read_func(bufio, 2))
1542 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001543 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001544 self.assertEqual(6, bufio.tell())
1545 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001546 raw.seek(0, 0)
1547 raw.write(b"XYZ")
1548 # flush() resets the read buffer
1549 bufio.flush()
1550 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001551 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552
1553 def test_flush_and_read(self):
1554 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1555
1556 def test_flush_and_readinto(self):
1557 def _readinto(bufio, n=-1):
1558 b = bytearray(n if n >= 0 else 9999)
1559 n = bufio.readinto(b)
1560 return bytes(b[:n])
1561 self.check_flush_and_read(_readinto)
1562
1563 def test_flush_and_peek(self):
1564 def _peek(bufio, n=-1):
1565 # This relies on the fact that the buffer can contain the whole
1566 # raw stream, otherwise peek() can return less.
1567 b = bufio.peek(n)
1568 if n != -1:
1569 b = b[:n]
1570 bufio.seek(len(b), 1)
1571 return b
1572 self.check_flush_and_read(_peek)
1573
1574 def test_flush_and_write(self):
1575 raw = self.BytesIO(b"abcdefghi")
1576 bufio = self.tp(raw)
1577
1578 bufio.write(b"123")
1579 bufio.flush()
1580 bufio.write(b"45")
1581 bufio.flush()
1582 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001583 self.assertEqual(b"12345fghi", raw.getvalue())
1584 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585
1586 def test_threads(self):
1587 BufferedReaderTest.test_threads(self)
1588 BufferedWriterTest.test_threads(self)
1589
1590 def test_writes_and_peek(self):
1591 def _peek(bufio):
1592 bufio.peek(1)
1593 self.check_writes(_peek)
1594 def _peek(bufio):
1595 pos = bufio.tell()
1596 bufio.seek(-1, 1)
1597 bufio.peek(1)
1598 bufio.seek(pos, 0)
1599 self.check_writes(_peek)
1600
1601 def test_writes_and_reads(self):
1602 def _read(bufio):
1603 bufio.seek(-1, 1)
1604 bufio.read(1)
1605 self.check_writes(_read)
1606
1607 def test_writes_and_read1s(self):
1608 def _read1(bufio):
1609 bufio.seek(-1, 1)
1610 bufio.read1(1)
1611 self.check_writes(_read1)
1612
1613 def test_writes_and_readintos(self):
1614 def _read(bufio):
1615 bufio.seek(-1, 1)
1616 bufio.readinto(bytearray(1))
1617 self.check_writes(_read)
1618
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001619 def test_write_after_readahead(self):
1620 # Issue #6629: writing after the buffer was filled by readahead should
1621 # first rewind the raw stream.
1622 for overwrite_size in [1, 5]:
1623 raw = self.BytesIO(b"A" * 10)
1624 bufio = self.tp(raw, 4)
1625 # Trigger readahead
1626 self.assertEqual(bufio.read(1), b"A")
1627 self.assertEqual(bufio.tell(), 1)
1628 # Overwriting should rewind the raw stream if it needs so
1629 bufio.write(b"B" * overwrite_size)
1630 self.assertEqual(bufio.tell(), overwrite_size + 1)
1631 # If the write size was smaller than the buffer size, flush() and
1632 # check that rewind happens.
1633 bufio.flush()
1634 self.assertEqual(bufio.tell(), overwrite_size + 1)
1635 s = raw.getvalue()
1636 self.assertEqual(s,
1637 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1638
Antoine Pitrou7c404892011-05-13 00:13:33 +02001639 def test_write_rewind_write(self):
1640 # Various combinations of reading / writing / seeking backwards / writing again
1641 def mutate(bufio, pos1, pos2):
1642 assert pos2 >= pos1
1643 # Fill the buffer
1644 bufio.seek(pos1)
1645 bufio.read(pos2 - pos1)
1646 bufio.write(b'\x02')
1647 # This writes earlier than the previous write, but still inside
1648 # the buffer.
1649 bufio.seek(pos1)
1650 bufio.write(b'\x01')
1651
1652 b = b"\x80\x81\x82\x83\x84"
1653 for i in range(0, len(b)):
1654 for j in range(i, len(b)):
1655 raw = self.BytesIO(b)
1656 bufio = self.tp(raw, 100)
1657 mutate(bufio, i, j)
1658 bufio.flush()
1659 expected = bytearray(b)
1660 expected[j] = 2
1661 expected[i] = 1
1662 self.assertEqual(raw.getvalue(), expected,
1663 "failed result for i=%d, j=%d" % (i, j))
1664
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001665 def test_truncate_after_read_or_write(self):
1666 raw = self.BytesIO(b"A" * 10)
1667 bufio = self.tp(raw, 100)
1668 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1669 self.assertEqual(bufio.truncate(), 2)
1670 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1671 self.assertEqual(bufio.truncate(), 4)
1672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 def test_misbehaved_io(self):
1674 BufferedReaderTest.test_misbehaved_io(self)
1675 BufferedWriterTest.test_misbehaved_io(self)
1676
Antoine Pitroue05565e2011-08-20 14:39:23 +02001677 def test_interleaved_read_write(self):
1678 # Test for issue #12213
1679 with self.BytesIO(b'abcdefgh') as raw:
1680 with self.tp(raw, 100) as f:
1681 f.write(b"1")
1682 self.assertEqual(f.read(1), b'b')
1683 f.write(b'2')
1684 self.assertEqual(f.read1(1), b'd')
1685 f.write(b'3')
1686 buf = bytearray(1)
1687 f.readinto(buf)
1688 self.assertEqual(buf, b'f')
1689 f.write(b'4')
1690 self.assertEqual(f.peek(1), b'h')
1691 f.flush()
1692 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1693
1694 with self.BytesIO(b'abc') as raw:
1695 with self.tp(raw, 100) as f:
1696 self.assertEqual(f.read(1), b'a')
1697 f.write(b"2")
1698 self.assertEqual(f.read(1), b'c')
1699 f.flush()
1700 self.assertEqual(raw.getvalue(), b'a2c')
1701
1702 def test_interleaved_readline_write(self):
1703 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1704 with self.tp(raw) as f:
1705 f.write(b'1')
1706 self.assertEqual(f.readline(), b'b\n')
1707 f.write(b'2')
1708 self.assertEqual(f.readline(), b'def\n')
1709 f.write(b'3')
1710 self.assertEqual(f.readline(), b'\n')
1711 f.flush()
1712 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1713
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001714 # You can't construct a BufferedRandom over a non-seekable stream.
1715 test_unseekable = None
1716
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001717class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718 tp = io.BufferedRandom
1719
1720 def test_constructor(self):
1721 BufferedRandomTest.test_constructor(self)
1722 # The allocation can succeed on 32-bit builds, e.g. with more
1723 # than 2GB RAM and a 64-bit kernel.
1724 if sys.maxsize > 0x7FFFFFFF:
1725 rawio = self.MockRawIO()
1726 bufio = self.tp(rawio)
1727 self.assertRaises((OverflowError, MemoryError, ValueError),
1728 bufio.__init__, rawio, sys.maxsize)
1729
1730 def test_garbage_collection(self):
1731 CBufferedReaderTest.test_garbage_collection(self)
1732 CBufferedWriterTest.test_garbage_collection(self)
1733
1734class PyBufferedRandomTest(BufferedRandomTest):
1735 tp = pyio.BufferedRandom
1736
1737
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001738# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1739# properties:
1740# - A single output character can correspond to many bytes of input.
1741# - The number of input bytes to complete the character can be
1742# undetermined until the last input byte is received.
1743# - The number of input bytes can vary depending on previous input.
1744# - A single input byte can correspond to many characters of output.
1745# - The number of output characters can be undetermined until the
1746# last input byte is received.
1747# - The number of output characters can vary depending on previous input.
1748
1749class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1750 """
1751 For testing seek/tell behavior with a stateful, buffering decoder.
1752
1753 Input is a sequence of words. Words may be fixed-length (length set
1754 by input) or variable-length (period-terminated). In variable-length
1755 mode, extra periods are ignored. Possible words are:
1756 - 'i' followed by a number sets the input length, I (maximum 99).
1757 When I is set to 0, words are space-terminated.
1758 - 'o' followed by a number sets the output length, O (maximum 99).
1759 - Any other word is converted into a word followed by a period on
1760 the output. The output word consists of the input word truncated
1761 or padded out with hyphens to make its length equal to O. If O
1762 is 0, the word is output verbatim without truncating or padding.
1763 I and O are initially set to 1. When I changes, any buffered input is
1764 re-scanned according to the new I. EOF also terminates the last word.
1765 """
1766
1767 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001768 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001769 self.reset()
1770
1771 def __repr__(self):
1772 return '<SID %x>' % id(self)
1773
1774 def reset(self):
1775 self.i = 1
1776 self.o = 1
1777 self.buffer = bytearray()
1778
1779 def getstate(self):
1780 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1781 return bytes(self.buffer), i*100 + o
1782
1783 def setstate(self, state):
1784 buffer, io = state
1785 self.buffer = bytearray(buffer)
1786 i, o = divmod(io, 100)
1787 self.i, self.o = i ^ 1, o ^ 1
1788
1789 def decode(self, input, final=False):
1790 output = ''
1791 for b in input:
1792 if self.i == 0: # variable-length, terminated with period
1793 if b == ord('.'):
1794 if self.buffer:
1795 output += self.process_word()
1796 else:
1797 self.buffer.append(b)
1798 else: # fixed-length, terminate after self.i bytes
1799 self.buffer.append(b)
1800 if len(self.buffer) == self.i:
1801 output += self.process_word()
1802 if final and self.buffer: # EOF terminates the last word
1803 output += self.process_word()
1804 return output
1805
1806 def process_word(self):
1807 output = ''
1808 if self.buffer[0] == ord('i'):
1809 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1810 elif self.buffer[0] == ord('o'):
1811 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1812 else:
1813 output = self.buffer.decode('ascii')
1814 if len(output) < self.o:
1815 output += '-'*self.o # pad out with hyphens
1816 if self.o:
1817 output = output[:self.o] # truncate to output length
1818 output += '.'
1819 self.buffer = bytearray()
1820 return output
1821
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001822 codecEnabled = False
1823
1824 @classmethod
1825 def lookupTestDecoder(cls, name):
1826 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001827 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001828 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001829 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001830 incrementalencoder=None,
1831 streamreader=None, streamwriter=None,
1832 incrementaldecoder=cls)
1833
1834# Register the previous decoder for testing.
1835# Disabled by default, tests will enable it.
1836codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1837
1838
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001839class StatefulIncrementalDecoderTest(unittest.TestCase):
1840 """
1841 Make sure the StatefulIncrementalDecoder actually works.
1842 """
1843
1844 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001845 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001846 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001847 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001848 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001849 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001850 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001851 # I=0, O=6 (variable-length input, fixed-length output)
1852 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1853 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001854 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001855 # I=6, O=3 (fixed-length input > fixed-length output)
1856 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1857 # I=0, then 3; O=29, then 15 (with longer output)
1858 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1859 'a----------------------------.' +
1860 'b----------------------------.' +
1861 'cde--------------------------.' +
1862 'abcdefghijabcde.' +
1863 'a.b------------.' +
1864 '.c.------------.' +
1865 'd.e------------.' +
1866 'k--------------.' +
1867 'l--------------.' +
1868 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001869 ]
1870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001872 # Try a few one-shot test cases.
1873 for input, eof, output in self.test_cases:
1874 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001875 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001876
1877 # Also test an unfinished decode, followed by forcing EOF.
1878 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(d.decode(b'oiabcd'), '')
1880 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001881
1882class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001883
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001884 def setUp(self):
1885 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1886 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001887 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001888
Guido van Rossumd0712812007-04-11 16:32:43 +00001889 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001890 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001892 def test_constructor(self):
1893 r = self.BytesIO(b"\xc3\xa9\n\n")
1894 b = self.BufferedReader(r, 1000)
1895 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001896 t.__init__(b, encoding="latin-1", newline="\r\n")
1897 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001898 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001899 t.__init__(b, encoding="utf-8", line_buffering=True)
1900 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001901 self.assertEqual(t.line_buffering, True)
1902 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 self.assertRaises(TypeError, t.__init__, b, newline=42)
1904 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1905
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001906 def test_detach(self):
1907 r = self.BytesIO()
1908 b = self.BufferedWriter(r)
1909 t = self.TextIOWrapper(b)
1910 self.assertIs(t.detach(), b)
1911
1912 t = self.TextIOWrapper(b, encoding="ascii")
1913 t.write("howdy")
1914 self.assertFalse(r.getvalue())
1915 t.detach()
1916 self.assertEqual(r.getvalue(), b"howdy")
1917 self.assertRaises(ValueError, t.detach)
1918
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001919 def test_repr(self):
1920 raw = self.BytesIO("hello".encode("utf-8"))
1921 b = self.BufferedReader(raw)
1922 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001923 modname = self.TextIOWrapper.__module__
1924 self.assertEqual(repr(t),
1925 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1926 raw.name = "dummy"
1927 self.assertEqual(repr(t),
1928 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001929 t.mode = "r"
1930 self.assertEqual(repr(t),
1931 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001932 raw.name = b"dummy"
1933 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001934 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936 def test_line_buffering(self):
1937 r = self.BytesIO()
1938 b = self.BufferedWriter(r, 1000)
1939 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001940 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001941 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001942 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001943 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001944 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001945 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001946
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001947 def test_default_encoding(self):
1948 old_environ = dict(os.environ)
1949 try:
1950 # try to get a user preferred encoding different than the current
1951 # locale encoding to check that TextIOWrapper() uses the current
1952 # locale encoding and not the user preferred encoding
1953 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1954 if key in os.environ:
1955 del os.environ[key]
1956
1957 current_locale_encoding = locale.getpreferredencoding(False)
1958 b = self.BytesIO()
1959 t = self.TextIOWrapper(b)
1960 self.assertEqual(t.encoding, current_locale_encoding)
1961 finally:
1962 os.environ.clear()
1963 os.environ.update(old_environ)
1964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 def test_encoding(self):
1966 # Check the encoding attribute is always set, and valid
1967 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001968 t = self.TextIOWrapper(b, encoding="utf-8")
1969 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001971 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001972 codecs.lookup(t.encoding)
1973
1974 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001975 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 b = self.BytesIO(b"abc\n\xff\n")
1977 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001978 self.assertRaises(UnicodeError, t.read)
1979 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001980 b = self.BytesIO(b"abc\n\xff\n")
1981 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001982 self.assertRaises(UnicodeError, t.read)
1983 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984 b = self.BytesIO(b"abc\n\xff\n")
1985 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001986 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001987 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001988 b = self.BytesIO(b"abc\n\xff\n")
1989 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001990 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001993 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 b = self.BytesIO()
1995 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001996 self.assertRaises(UnicodeError, t.write, "\xff")
1997 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 b = self.BytesIO()
1999 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002000 self.assertRaises(UnicodeError, t.write, "\xff")
2001 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 b = self.BytesIO()
2003 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002004 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002005 t.write("abc\xffdef\n")
2006 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002007 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002008 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002009 b = self.BytesIO()
2010 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002011 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002012 t.write("abc\xffdef\n")
2013 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002014 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002017 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2018
2019 tests = [
2020 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002021 [ '', input_lines ],
2022 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2023 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2024 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002025 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002026 encodings = (
2027 'utf-8', 'latin-1',
2028 'utf-16', 'utf-16-le', 'utf-16-be',
2029 'utf-32', 'utf-32-le', 'utf-32-be',
2030 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002031
Guido van Rossum8358db22007-08-18 21:39:55 +00002032 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002033 # character in TextIOWrapper._pending_line.
2034 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002035 # XXX: str.encode() should return bytes
2036 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002037 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002038 for bufsize in range(1, 10):
2039 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2041 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002042 encoding=encoding)
2043 if do_reads:
2044 got_lines = []
2045 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002046 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002047 if c2 == '':
2048 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002049 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002050 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002051 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002052 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002053
2054 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002055 self.assertEqual(got_line, exp_line)
2056 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002058 def test_newlines_input(self):
2059 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002060 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2061 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002062 (None, normalized.decode("ascii").splitlines(keepends=True)),
2063 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002064 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2065 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2066 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002067 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002068 buf = self.BytesIO(testdata)
2069 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002071 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002072 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 def test_newlines_output(self):
2075 testdict = {
2076 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2077 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2078 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2079 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2080 }
2081 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2082 for newline, expected in tests:
2083 buf = self.BytesIO()
2084 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2085 txt.write("AAA\nB")
2086 txt.write("BB\nCCC\n")
2087 txt.write("X\rY\r\nZ")
2088 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(buf.closed, False)
2090 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091
2092 def test_destructor(self):
2093 l = []
2094 base = self.BytesIO
2095 class MyBytesIO(base):
2096 def close(self):
2097 l.append(self.getvalue())
2098 base.close(self)
2099 b = MyBytesIO()
2100 t = self.TextIOWrapper(b, encoding="ascii")
2101 t.write("abc")
2102 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002103 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002104 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105
2106 def test_override_destructor(self):
2107 record = []
2108 class MyTextIO(self.TextIOWrapper):
2109 def __del__(self):
2110 record.append(1)
2111 try:
2112 f = super().__del__
2113 except AttributeError:
2114 pass
2115 else:
2116 f()
2117 def close(self):
2118 record.append(2)
2119 super().close()
2120 def flush(self):
2121 record.append(3)
2122 super().flush()
2123 b = self.BytesIO()
2124 t = MyTextIO(b, encoding="ascii")
2125 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002126 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002127 self.assertEqual(record, [1, 2, 3])
2128
2129 def test_error_through_destructor(self):
2130 # Test that the exception state is not modified by a destructor,
2131 # even if close() fails.
2132 rawio = self.CloseFailureIO()
2133 def f():
2134 self.TextIOWrapper(rawio).xyzzy
2135 with support.captured_output("stderr") as s:
2136 self.assertRaises(AttributeError, f)
2137 s = s.getvalue().strip()
2138 if s:
2139 # The destructor *may* have printed an unraisable error, check it
2140 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002141 self.assertTrue(s.startswith("Exception IOError: "), s)
2142 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002143
Guido van Rossum9b76da62007-04-11 01:09:03 +00002144 # Systematic tests of the text I/O API
2145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002147 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002148 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002150 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002151 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002152 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002154 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002155 self.assertEqual(f.tell(), 0)
2156 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002157 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(f.seek(0), 0)
2159 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002160 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(f.read(2), "ab")
2162 self.assertEqual(f.read(1), "c")
2163 self.assertEqual(f.read(1), "")
2164 self.assertEqual(f.read(), "")
2165 self.assertEqual(f.tell(), cookie)
2166 self.assertEqual(f.seek(0), 0)
2167 self.assertEqual(f.seek(0, 2), cookie)
2168 self.assertEqual(f.write("def"), 3)
2169 self.assertEqual(f.seek(cookie), cookie)
2170 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002171 if enc.startswith("utf"):
2172 self.multi_line_test(f, enc)
2173 f.close()
2174
2175 def multi_line_test(self, f, enc):
2176 f.seek(0)
2177 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002178 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002179 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002180 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002181 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002182 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002183 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002184 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002185 wlines.append((f.tell(), line))
2186 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002187 f.seek(0)
2188 rlines = []
2189 while True:
2190 pos = f.tell()
2191 line = f.readline()
2192 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002193 break
2194 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002198 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002199 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002200 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002201 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002202 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002203 p2 = f.tell()
2204 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(f.tell(), p0)
2206 self.assertEqual(f.readline(), "\xff\n")
2207 self.assertEqual(f.tell(), p1)
2208 self.assertEqual(f.readline(), "\xff\n")
2209 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002210 f.seek(0)
2211 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002212 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002213 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002214 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002215 f.close()
2216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217 def test_seeking(self):
2218 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002219 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002220 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002221 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002222 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002223 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002224 suffix = bytes(u_suffix.encode("utf-8"))
2225 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002226 with self.open(support.TESTFN, "wb") as f:
2227 f.write(line*2)
2228 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2229 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(s, str(prefix, "ascii"))
2231 self.assertEqual(f.tell(), prefix_size)
2232 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002235 # Regression test for a specific bug
2236 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002237 with self.open(support.TESTFN, "wb") as f:
2238 f.write(data)
2239 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2240 f._CHUNK_SIZE # Just test that it exists
2241 f._CHUNK_SIZE = 2
2242 f.readline()
2243 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002245 def test_seek_and_tell(self):
2246 #Test seek/tell using the StatefulIncrementalDecoder.
2247 # Make test faster by doing smaller seeks
2248 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002249
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002250 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002251 """Tell/seek to various points within a data stream and ensure
2252 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002254 f.write(data)
2255 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002256 f = self.open(support.TESTFN, encoding='test_decoder')
2257 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002258 decoded = f.read()
2259 f.close()
2260
Neal Norwitze2b07052008-03-18 19:52:05 +00002261 for i in range(min_pos, len(decoded) + 1): # seek positions
2262 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002264 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002265 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002267 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002269 f.close()
2270
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002271 # Enable the test decoder.
2272 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002273
2274 # Run the tests.
2275 try:
2276 # Try each test case.
2277 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002278 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002279
2280 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002281 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2282 offset = CHUNK_SIZE - len(input)//2
2283 prefix = b'.'*offset
2284 # Don't bother seeking into the prefix (takes too long).
2285 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002286 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002287
2288 # Ensure our test decoder won't interfere with subsequent tests.
2289 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002290 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002291
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002292 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002293 data = "1234567890"
2294 tests = ("utf-16",
2295 "utf-16-le",
2296 "utf-16-be",
2297 "utf-32",
2298 "utf-32-le",
2299 "utf-32-be")
2300 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002301 buf = self.BytesIO()
2302 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002303 # Check if the BOM is written only once (see issue1753).
2304 f.write(data)
2305 f.write(data)
2306 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002307 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002308 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002309 self.assertEqual(f.read(), data * 2)
2310 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002311
Benjamin Petersona1b49012009-03-31 23:11:32 +00002312 def test_unreadable(self):
2313 class UnReadable(self.BytesIO):
2314 def readable(self):
2315 return False
2316 txt = self.TextIOWrapper(UnReadable())
2317 self.assertRaises(IOError, txt.read)
2318
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002319 def test_read_one_by_one(self):
2320 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002321 reads = ""
2322 while True:
2323 c = txt.read(1)
2324 if not c:
2325 break
2326 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002327 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002328
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002329 def test_readlines(self):
2330 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2331 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2332 txt.seek(0)
2333 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2334 txt.seek(0)
2335 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2336
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002337 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002339 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002340 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002341 reads = ""
2342 while True:
2343 c = txt.read(128)
2344 if not c:
2345 break
2346 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002347 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002348
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02002349 def test_writelines(self):
2350 l = ['ab', 'cd', 'ef']
2351 buf = self.BytesIO()
2352 txt = self.TextIOWrapper(buf)
2353 txt.writelines(l)
2354 txt.flush()
2355 self.assertEqual(buf.getvalue(), b'abcdef')
2356
2357 def test_writelines_userlist(self):
2358 l = UserList(['ab', 'cd', 'ef'])
2359 buf = self.BytesIO()
2360 txt = self.TextIOWrapper(buf)
2361 txt.writelines(l)
2362 txt.flush()
2363 self.assertEqual(buf.getvalue(), b'abcdef')
2364
2365 def test_writelines_error(self):
2366 txt = self.TextIOWrapper(self.BytesIO())
2367 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2368 self.assertRaises(TypeError, txt.writelines, None)
2369 self.assertRaises(TypeError, txt.writelines, b'abc')
2370
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002371 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002373
2374 # read one char at a time
2375 reads = ""
2376 while True:
2377 c = txt.read(1)
2378 if not c:
2379 break
2380 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002381 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002382
2383 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002385 txt._CHUNK_SIZE = 4
2386
2387 reads = ""
2388 while True:
2389 c = txt.read(4)
2390 if not c:
2391 break
2392 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002393 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002394
2395 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002397 txt._CHUNK_SIZE = 4
2398
2399 reads = txt.read(4)
2400 reads += txt.read(4)
2401 reads += txt.readline()
2402 reads += txt.readline()
2403 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002404 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002405
2406 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002408 txt._CHUNK_SIZE = 4
2409
2410 reads = txt.read(4)
2411 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002412 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002413
2414 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002416 txt._CHUNK_SIZE = 4
2417
2418 reads = txt.read(4)
2419 pos = txt.tell()
2420 txt.seek(0)
2421 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002422 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002423
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002424 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002425 buffer = self.BytesIO(self.testdata)
2426 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002427
2428 self.assertEqual(buffer.seekable(), txt.seekable())
2429
Antoine Pitroue4501852009-05-14 18:55:55 +00002430 def test_append_bom(self):
2431 # The BOM is not written again when appending to a non-empty file
2432 filename = support.TESTFN
2433 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2434 with self.open(filename, 'w', encoding=charset) as f:
2435 f.write('aaa')
2436 pos = f.tell()
2437 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002438 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002439
2440 with self.open(filename, 'a', encoding=charset) as f:
2441 f.write('xxx')
2442 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002444
2445 def test_seek_bom(self):
2446 # Same test, but when seeking manually
2447 filename = support.TESTFN
2448 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2449 with self.open(filename, 'w', encoding=charset) as f:
2450 f.write('aaa')
2451 pos = f.tell()
2452 with self.open(filename, 'r+', encoding=charset) as f:
2453 f.seek(pos)
2454 f.write('zzz')
2455 f.seek(0)
2456 f.write('bbb')
2457 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002459
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002460 def test_errors_property(self):
2461 with self.open(support.TESTFN, "w") as f:
2462 self.assertEqual(f.errors, "strict")
2463 with self.open(support.TESTFN, "w", errors="replace") as f:
2464 self.assertEqual(f.errors, "replace")
2465
Brett Cannon31f59292011-02-21 19:29:56 +00002466 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002467 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002468 def test_threads_write(self):
2469 # Issue6750: concurrent writes could duplicate data
2470 event = threading.Event()
2471 with self.open(support.TESTFN, "w", buffering=1) as f:
2472 def run(n):
2473 text = "Thread%03d\n" % n
2474 event.wait()
2475 f.write(text)
2476 threads = [threading.Thread(target=lambda n=x: run(n))
2477 for x in range(20)]
2478 for t in threads:
2479 t.start()
2480 time.sleep(0.02)
2481 event.set()
2482 for t in threads:
2483 t.join()
2484 with self.open(support.TESTFN) as f:
2485 content = f.read()
2486 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002488
Antoine Pitrou6be88762010-05-03 16:48:20 +00002489 def test_flush_error_on_close(self):
2490 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2491 def bad_flush():
2492 raise IOError()
2493 txt.flush = bad_flush
2494 self.assertRaises(IOError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06002495 self.assertTrue(txt.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00002496
2497 def test_multi_close(self):
2498 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2499 txt.close()
2500 txt.close()
2501 txt.close()
2502 self.assertRaises(ValueError, txt.flush)
2503
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002504 def test_unseekable(self):
2505 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2506 self.assertRaises(self.UnsupportedOperation, txt.tell)
2507 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2508
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002509 def test_readonly_attributes(self):
2510 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2511 buf = self.BytesIO(self.testdata)
2512 with self.assertRaises(AttributeError):
2513 txt.buffer = buf
2514
Antoine Pitroue96ec682011-07-23 21:46:35 +02002515 def test_rawio(self):
2516 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2517 # that subprocess.Popen() can have the required unbuffered
2518 # semantics with universal_newlines=True.
2519 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2520 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2521 # Reads
2522 self.assertEqual(txt.read(4), 'abcd')
2523 self.assertEqual(txt.readline(), 'efghi\n')
2524 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2525
2526 def test_rawio_write_through(self):
2527 # Issue #12591: with write_through=True, writes don't need a flush
2528 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2529 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2530 write_through=True)
2531 txt.write('1')
2532 txt.write('23\n4')
2533 txt.write('5')
2534 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002536class CTextIOWrapperTest(TextIOWrapperTest):
2537
2538 def test_initialization(self):
2539 r = self.BytesIO(b"\xc3\xa9\n\n")
2540 b = self.BufferedReader(r, 1000)
2541 t = self.TextIOWrapper(b)
2542 self.assertRaises(TypeError, t.__init__, b, newline=42)
2543 self.assertRaises(ValueError, t.read)
2544 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2545 self.assertRaises(ValueError, t.read)
2546
2547 def test_garbage_collection(self):
2548 # C TextIOWrapper objects are collected, and collecting them flushes
2549 # all data to disk.
2550 # The Python version has __del__, so it ends in gc.garbage instead.
2551 rawio = io.FileIO(support.TESTFN, "wb")
2552 b = self.BufferedWriter(rawio)
2553 t = self.TextIOWrapper(b, encoding="ascii")
2554 t.write("456def")
2555 t.x = t
2556 wr = weakref.ref(t)
2557 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002558 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002559 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002560 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002561 self.assertEqual(f.read(), b"456def")
2562
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002563 def test_rwpair_cleared_before_textio(self):
2564 # Issue 13070: TextIOWrapper's finalization would crash when called
2565 # after the reference to the underlying BufferedRWPair's writer got
2566 # cleared by the GC.
2567 for i in range(1000):
2568 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2569 t1 = self.TextIOWrapper(b1, encoding="ascii")
2570 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2571 t2 = self.TextIOWrapper(b2, encoding="ascii")
2572 # circular references
2573 t1.buddy = t2
2574 t2.buddy = t1
2575 support.gc_collect()
2576
2577
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578class PyTextIOWrapperTest(TextIOWrapperTest):
2579 pass
2580
2581
2582class IncrementalNewlineDecoderTest(unittest.TestCase):
2583
2584 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002585 # UTF-8 specific tests for a newline decoder
2586 def _check_decode(b, s, **kwargs):
2587 # We exercise getstate() / setstate() as well as decode()
2588 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002589 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002590 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002591 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002592
Antoine Pitrou180a3362008-12-14 16:36:46 +00002593 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002594
Antoine Pitrou180a3362008-12-14 16:36:46 +00002595 _check_decode(b'\xe8', "")
2596 _check_decode(b'\xa2', "")
2597 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002598
Antoine Pitrou180a3362008-12-14 16:36:46 +00002599 _check_decode(b'\xe8', "")
2600 _check_decode(b'\xa2', "")
2601 _check_decode(b'\x88', "\u8888")
2602
2603 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002604 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2605
Antoine Pitrou180a3362008-12-14 16:36:46 +00002606 decoder.reset()
2607 _check_decode(b'\n', "\n")
2608 _check_decode(b'\r', "")
2609 _check_decode(b'', "\n", final=True)
2610 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002611
Antoine Pitrou180a3362008-12-14 16:36:46 +00002612 _check_decode(b'\r', "")
2613 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002614
Antoine Pitrou180a3362008-12-14 16:36:46 +00002615 _check_decode(b'\r\r\n', "\n\n")
2616 _check_decode(b'\r', "")
2617 _check_decode(b'\r', "\n")
2618 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002619
Antoine Pitrou180a3362008-12-14 16:36:46 +00002620 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2621 _check_decode(b'\xe8\xa2\x88', "\u8888")
2622 _check_decode(b'\n', "\n")
2623 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2624 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002625
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002626 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002627 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628 if encoding is not None:
2629 encoder = codecs.getincrementalencoder(encoding)()
2630 def _decode_bytewise(s):
2631 # Decode one byte at a time
2632 for b in encoder.encode(s):
2633 result.append(decoder.decode(bytes([b])))
2634 else:
2635 encoder = None
2636 def _decode_bytewise(s):
2637 # Decode one char at a time
2638 for c in s:
2639 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002640 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002641 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002642 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002643 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002644 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002645 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002646 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002647 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002649 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002651 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002652 input = "abc"
2653 if encoder is not None:
2654 encoder.reset()
2655 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002656 self.assertEqual(decoder.decode(input), "abc")
2657 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002658
2659 def test_newline_decoder(self):
2660 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 # None meaning the IncrementalNewlineDecoder takes unicode input
2662 # rather than bytes input
2663 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002664 'utf-16', 'utf-16-le', 'utf-16-be',
2665 'utf-32', 'utf-32-le', 'utf-32-be',
2666 )
2667 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002668 decoder = enc and codecs.getincrementaldecoder(enc)()
2669 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2670 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002671 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002672 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2673 self.check_newline_decoding_utf8(decoder)
2674
Antoine Pitrou66913e22009-03-06 23:40:56 +00002675 def test_newline_bytes(self):
2676 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2677 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002678 self.assertEqual(dec.newlines, None)
2679 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2680 self.assertEqual(dec.newlines, None)
2681 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2682 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002683 dec = self.IncrementalNewlineDecoder(None, translate=False)
2684 _check(dec)
2685 dec = self.IncrementalNewlineDecoder(None, translate=True)
2686 _check(dec)
2687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002688class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2689 pass
2690
2691class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2692 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002693
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002694
Guido van Rossum01a27522007-03-07 01:00:12 +00002695# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002696
Guido van Rossum5abbf752007-08-27 17:39:33 +00002697class MiscIOTest(unittest.TestCase):
2698
Barry Warsaw40e82462008-11-20 20:14:50 +00002699 def tearDown(self):
2700 support.unlink(support.TESTFN)
2701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002702 def test___all__(self):
2703 for name in self.io.__all__:
2704 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002705 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002706 if name == "open":
2707 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002708 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002709 self.assertTrue(issubclass(obj, Exception), name)
2710 elif not name.startswith("SEEK_"):
2711 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002712
Barry Warsaw40e82462008-11-20 20:14:50 +00002713 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002715 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002716 f.close()
2717
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002719 self.assertEqual(f.name, support.TESTFN)
2720 self.assertEqual(f.buffer.name, support.TESTFN)
2721 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2722 self.assertEqual(f.mode, "U")
2723 self.assertEqual(f.buffer.mode, "rb")
2724 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002725 f.close()
2726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002728 self.assertEqual(f.mode, "w+")
2729 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2730 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002733 self.assertEqual(g.mode, "wb")
2734 self.assertEqual(g.raw.mode, "wb")
2735 self.assertEqual(g.name, f.fileno())
2736 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002737 f.close()
2738 g.close()
2739
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002740 def test_io_after_close(self):
2741 for kwargs in [
2742 {"mode": "w"},
2743 {"mode": "wb"},
2744 {"mode": "w", "buffering": 1},
2745 {"mode": "w", "buffering": 2},
2746 {"mode": "wb", "buffering": 0},
2747 {"mode": "r"},
2748 {"mode": "rb"},
2749 {"mode": "r", "buffering": 1},
2750 {"mode": "r", "buffering": 2},
2751 {"mode": "rb", "buffering": 0},
2752 {"mode": "w+"},
2753 {"mode": "w+b"},
2754 {"mode": "w+", "buffering": 1},
2755 {"mode": "w+", "buffering": 2},
2756 {"mode": "w+b", "buffering": 0},
2757 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002759 f.close()
2760 self.assertRaises(ValueError, f.flush)
2761 self.assertRaises(ValueError, f.fileno)
2762 self.assertRaises(ValueError, f.isatty)
2763 self.assertRaises(ValueError, f.__iter__)
2764 if hasattr(f, "peek"):
2765 self.assertRaises(ValueError, f.peek, 1)
2766 self.assertRaises(ValueError, f.read)
2767 if hasattr(f, "read1"):
2768 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002769 if hasattr(f, "readall"):
2770 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002771 if hasattr(f, "readinto"):
2772 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2773 self.assertRaises(ValueError, f.readline)
2774 self.assertRaises(ValueError, f.readlines)
2775 self.assertRaises(ValueError, f.seek, 0)
2776 self.assertRaises(ValueError, f.tell)
2777 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002778 self.assertRaises(ValueError, f.write,
2779 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002780 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002781 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002783 def test_blockingioerror(self):
2784 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002785 class C(str):
2786 pass
2787 c = C("")
2788 b = self.BlockingIOError(1, c)
2789 c.b = b
2790 b.c = c
2791 wr = weakref.ref(c)
2792 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002793 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002794 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002795
2796 def test_abcs(self):
2797 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002798 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2799 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2800 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2801 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002802
2803 def _check_abc_inheritance(self, abcmodule):
2804 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002805 self.assertIsInstance(f, abcmodule.IOBase)
2806 self.assertIsInstance(f, abcmodule.RawIOBase)
2807 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2808 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002809 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002810 self.assertIsInstance(f, abcmodule.IOBase)
2811 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2812 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2813 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002815 self.assertIsInstance(f, abcmodule.IOBase)
2816 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2817 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2818 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002819
2820 def test_abc_inheritance(self):
2821 # Test implementations inherit from their respective ABCs
2822 self._check_abc_inheritance(self)
2823
2824 def test_abc_inheritance_official(self):
2825 # Test implementations inherit from the official ABCs of the
2826 # baseline "io" module.
2827 self._check_abc_inheritance(io)
2828
Antoine Pitroue033e062010-10-29 10:38:18 +00002829 def _check_warn_on_dealloc(self, *args, **kwargs):
2830 f = open(*args, **kwargs)
2831 r = repr(f)
2832 with self.assertWarns(ResourceWarning) as cm:
2833 f = None
2834 support.gc_collect()
2835 self.assertIn(r, str(cm.warning.args[0]))
2836
2837 def test_warn_on_dealloc(self):
2838 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2839 self._check_warn_on_dealloc(support.TESTFN, "wb")
2840 self._check_warn_on_dealloc(support.TESTFN, "w")
2841
2842 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2843 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002844 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002845 for fd in fds:
2846 try:
2847 os.close(fd)
2848 except EnvironmentError as e:
2849 if e.errno != errno.EBADF:
2850 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002851 self.addCleanup(cleanup_fds)
2852 r, w = os.pipe()
2853 fds += r, w
2854 self._check_warn_on_dealloc(r, *args, **kwargs)
2855 # When using closefd=False, there's no warning
2856 r, w = os.pipe()
2857 fds += r, w
2858 with warnings.catch_warnings(record=True) as recorded:
2859 open(r, *args, closefd=False, **kwargs)
2860 support.gc_collect()
2861 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002862
2863 def test_warn_on_dealloc_fd(self):
2864 self._check_warn_on_dealloc_fd("rb", buffering=0)
2865 self._check_warn_on_dealloc_fd("rb")
2866 self._check_warn_on_dealloc_fd("r")
2867
2868
Antoine Pitrou243757e2010-11-05 21:15:39 +00002869 def test_pickling(self):
2870 # Pickling file objects is forbidden
2871 for kwargs in [
2872 {"mode": "w"},
2873 {"mode": "wb"},
2874 {"mode": "wb", "buffering": 0},
2875 {"mode": "r"},
2876 {"mode": "rb"},
2877 {"mode": "rb", "buffering": 0},
2878 {"mode": "w+"},
2879 {"mode": "w+b"},
2880 {"mode": "w+b", "buffering": 0},
2881 ]:
2882 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2883 with self.open(support.TESTFN, **kwargs) as f:
2884 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2885
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002886 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2887 def test_nonblock_pipe_write_bigbuf(self):
2888 self._test_nonblock_pipe_write(16*1024)
2889
2890 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2891 def test_nonblock_pipe_write_smallbuf(self):
2892 self._test_nonblock_pipe_write(1024)
2893
2894 def _set_non_blocking(self, fd):
2895 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2896 self.assertNotEqual(flags, -1)
2897 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2898 self.assertEqual(res, 0)
2899
2900 def _test_nonblock_pipe_write(self, bufsize):
2901 sent = []
2902 received = []
2903 r, w = os.pipe()
2904 self._set_non_blocking(r)
2905 self._set_non_blocking(w)
2906
2907 # To exercise all code paths in the C implementation we need
2908 # to play with buffer sizes. For instance, if we choose a
2909 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2910 # then we will never get a partial write of the buffer.
2911 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2912 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2913
2914 with rf, wf:
2915 for N in 9999, 73, 7574:
2916 try:
2917 i = 0
2918 while True:
2919 msg = bytes([i % 26 + 97]) * N
2920 sent.append(msg)
2921 wf.write(msg)
2922 i += 1
2923
2924 except self.BlockingIOError as e:
2925 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002926 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002927 sent[-1] = sent[-1][:e.characters_written]
2928 received.append(rf.read())
2929 msg = b'BLOCKED'
2930 wf.write(msg)
2931 sent.append(msg)
2932
2933 while True:
2934 try:
2935 wf.flush()
2936 break
2937 except self.BlockingIOError as e:
2938 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002939 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002940 self.assertEqual(e.characters_written, 0)
2941 received.append(rf.read())
2942
2943 received += iter(rf.read, None)
2944
2945 sent, received = b''.join(sent), b''.join(received)
2946 self.assertTrue(sent == received)
2947 self.assertTrue(wf.closed)
2948 self.assertTrue(rf.closed)
2949
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002950 def test_create_fail(self):
2951 # 'x' mode fails if file is existing
2952 with self.open(support.TESTFN, 'w'):
2953 pass
2954 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2955
2956 def test_create_writes(self):
2957 # 'x' mode opens for writing
2958 with self.open(support.TESTFN, 'xb') as f:
2959 f.write(b"spam")
2960 with self.open(support.TESTFN, 'rb') as f:
2961 self.assertEqual(b"spam", f.read())
2962
Christian Heimes7b648752012-09-10 14:48:43 +02002963 def test_open_allargs(self):
2964 # there used to be a buffer overflow in the parser for rawmode
2965 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
2966
2967
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002968class CMiscIOTest(MiscIOTest):
2969 io = io
2970
2971class PyMiscIOTest(MiscIOTest):
2972 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002973
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002974
2975@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2976class SignalsTest(unittest.TestCase):
2977
2978 def setUp(self):
2979 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2980
2981 def tearDown(self):
2982 signal.signal(signal.SIGALRM, self.oldalrm)
2983
2984 def alarm_interrupt(self, sig, frame):
2985 1/0
2986
2987 @unittest.skipUnless(threading, 'Threading required for this test.')
2988 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2989 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002990 invokes the signal handler, and bubbles up the exception raised
2991 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002992 read_results = []
2993 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002994 if hasattr(signal, 'pthread_sigmask'):
2995 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002996 s = os.read(r, 1)
2997 read_results.append(s)
2998 t = threading.Thread(target=_read)
2999 t.daemon = True
3000 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00003001 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003002 try:
3003 wio = self.io.open(w, **fdopen_kwargs)
3004 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003005 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003006 # Fill the pipe enough that the write will be blocking.
3007 # It will be interrupted by the timer armed above. Since the
3008 # other thread has read one byte, the low-level write will
3009 # return with a successful (partial) result rather than an EINTR.
3010 # The buffered IO layer must check for pending signal
3011 # handlers, which in this case will invoke alarm_interrupt().
3012 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02003013 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003014 t.join()
3015 # We got one byte, get another one and check that it isn't a
3016 # repeat of the first one.
3017 read_results.append(os.read(r, 1))
3018 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3019 finally:
3020 os.close(w)
3021 os.close(r)
3022 # This is deliberate. If we didn't close the file descriptor
3023 # before closing wio, wio would try to flush its internal
3024 # buffer, and block again.
3025 try:
3026 wio.close()
3027 except IOError as e:
3028 if e.errno != errno.EBADF:
3029 raise
3030
3031 def test_interrupted_write_unbuffered(self):
3032 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3033
3034 def test_interrupted_write_buffered(self):
3035 self.check_interrupted_write(b"xy", b"xy", mode="wb")
3036
3037 def test_interrupted_write_text(self):
3038 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3039
Brett Cannon31f59292011-02-21 19:29:56 +00003040 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003041 def check_reentrant_write(self, data, **fdopen_kwargs):
3042 def on_alarm(*args):
3043 # Will be called reentrantly from the same thread
3044 wio.write(data)
3045 1/0
3046 signal.signal(signal.SIGALRM, on_alarm)
3047 r, w = os.pipe()
3048 wio = self.io.open(w, **fdopen_kwargs)
3049 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003050 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003051 # Either the reentrant call to wio.write() fails with RuntimeError,
3052 # or the signal handler raises ZeroDivisionError.
3053 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3054 while 1:
3055 for i in range(100):
3056 wio.write(data)
3057 wio.flush()
3058 # Make sure the buffer doesn't fill up and block further writes
3059 os.read(r, len(data) * 100)
3060 exc = cm.exception
3061 if isinstance(exc, RuntimeError):
3062 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3063 finally:
3064 wio.close()
3065 os.close(r)
3066
3067 def test_reentrant_write_buffered(self):
3068 self.check_reentrant_write(b"xy", mode="wb")
3069
3070 def test_reentrant_write_text(self):
3071 self.check_reentrant_write("xy", mode="w", encoding="ascii")
3072
Antoine Pitrou707ce822011-02-25 21:24:11 +00003073 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3074 """Check that a buffered read, when it gets interrupted (either
3075 returning a partial result or EINTR), properly invokes the signal
3076 handler and retries if the latter returned successfully."""
3077 r, w = os.pipe()
3078 fdopen_kwargs["closefd"] = False
3079 def alarm_handler(sig, frame):
3080 os.write(w, b"bar")
3081 signal.signal(signal.SIGALRM, alarm_handler)
3082 try:
3083 rio = self.io.open(r, **fdopen_kwargs)
3084 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003085 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003086 # Expected behaviour:
3087 # - first raw read() returns partial b"foo"
3088 # - second raw read() returns EINTR
3089 # - third raw read() returns b"bar"
3090 self.assertEqual(decode(rio.read(6)), "foobar")
3091 finally:
3092 rio.close()
3093 os.close(w)
3094 os.close(r)
3095
Antoine Pitrou20db5112011-08-19 20:32:34 +02003096 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003097 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3098 mode="rb")
3099
Antoine Pitrou20db5112011-08-19 20:32:34 +02003100 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003101 self.check_interrupted_read_retry(lambda x: x,
3102 mode="r")
3103
3104 @unittest.skipUnless(threading, 'Threading required for this test.')
3105 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3106 """Check that a buffered write, when it gets interrupted (either
3107 returning a partial result or EINTR), properly invokes the signal
3108 handler and retries if the latter returned successfully."""
3109 select = support.import_module("select")
3110 # A quantity that exceeds the buffer size of an anonymous pipe's
3111 # write end.
3112 N = 1024 * 1024
3113 r, w = os.pipe()
3114 fdopen_kwargs["closefd"] = False
3115 # We need a separate thread to read from the pipe and allow the
3116 # write() to finish. This thread is started after the SIGALRM is
3117 # received (forcing a first EINTR in write()).
3118 read_results = []
3119 write_finished = False
3120 def _read():
3121 while not write_finished:
3122 while r in select.select([r], [], [], 1.0)[0]:
3123 s = os.read(r, 1024)
3124 read_results.append(s)
3125 t = threading.Thread(target=_read)
3126 t.daemon = True
3127 def alarm1(sig, frame):
3128 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003129 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003130 def alarm2(sig, frame):
3131 t.start()
3132 signal.signal(signal.SIGALRM, alarm1)
3133 try:
3134 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003135 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003136 # Expected behaviour:
3137 # - first raw write() is partial (because of the limited pipe buffer
3138 # and the first alarm)
3139 # - second raw write() returns EINTR (because of the second alarm)
3140 # - subsequent write()s are successful (either partial or complete)
3141 self.assertEqual(N, wio.write(item * N))
3142 wio.flush()
3143 write_finished = True
3144 t.join()
3145 self.assertEqual(N, sum(len(x) for x in read_results))
3146 finally:
3147 write_finished = True
3148 os.close(w)
3149 os.close(r)
3150 # This is deliberate. If we didn't close the file descriptor
3151 # before closing wio, wio would try to flush its internal
3152 # buffer, and could block (in case of failure).
3153 try:
3154 wio.close()
3155 except IOError as e:
3156 if e.errno != errno.EBADF:
3157 raise
3158
Antoine Pitrou20db5112011-08-19 20:32:34 +02003159 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003160 self.check_interrupted_write_retry(b"x", mode="wb")
3161
Antoine Pitrou20db5112011-08-19 20:32:34 +02003162 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003163 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3164
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003165
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003166class CSignalsTest(SignalsTest):
3167 io = io
3168
3169class PySignalsTest(SignalsTest):
3170 io = pyio
3171
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003172 # Handling reentrancy issues would slow down _pyio even more, so the
3173 # tests are disabled.
3174 test_reentrant_write_buffered = None
3175 test_reentrant_write_text = None
3176
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003177
Guido van Rossum28524c72007-02-27 05:47:44 +00003178def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003179 tests = (CIOTest, PyIOTest,
3180 CBufferedReaderTest, PyBufferedReaderTest,
3181 CBufferedWriterTest, PyBufferedWriterTest,
3182 CBufferedRWPairTest, PyBufferedRWPairTest,
3183 CBufferedRandomTest, PyBufferedRandomTest,
3184 StatefulIncrementalDecoderTest,
3185 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3186 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003187 CMiscIOTest, PyMiscIOTest,
3188 CSignalsTest, PySignalsTest,
3189 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003190
3191 # Put the namespaces of the IO module we are testing and some useful mock
3192 # classes in the __dict__ of each test.
3193 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003194 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003195 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3196 c_io_ns = {name : getattr(io, name) for name in all_members}
3197 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3198 globs = globals()
3199 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3200 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3201 # Avoid turning open into a bound method.
3202 py_io_ns["open"] = pyio.OpenWrapper
3203 for test in tests:
3204 if test.__name__.startswith("C"):
3205 for name, obj in c_io_ns.items():
3206 setattr(test, name, obj)
3207 elif test.__name__.startswith("Py"):
3208 for name, obj in py_io_ns.items():
3209 setattr(test, name, obj)
3210
3211 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003212
3213if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003214 test_main()