blob: ed8564cc173b69619689850f2da59ab5e0400156 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000031import time
Guido van Rossum28524c72007-02-27 05:47:44 +000032import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000033import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020034import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000037from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000038
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000039import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040import io # C implementation of io
41import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000042try:
43 import threading
44except ImportError:
45 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010046try:
47 import fcntl
48except ImportError:
49 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051def _default_chunk_size():
52 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000053 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 return f._CHUNK_SIZE
55
56
Antoine Pitrou328ec742010-09-14 18:37:24 +000057class MockRawIOWithoutRead:
58 """A RawIO implementation without read(), so as to exercise the default
59 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000061 def __init__(self, read_stack=()):
62 self._read_stack = list(read_stack)
63 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000065 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000068 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000069 return len(b)
70
71 def writable(self):
72 return True
73
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 def fileno(self):
75 return 42
76
77 def readable(self):
78 return True
79
Guido van Rossum01a27522007-03-07 01:00:12 +000080 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000081 return True
82
Guido van Rossum01a27522007-03-07 01:00:12 +000083 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000085
86 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000087 return 0 # same comment as above
88
89 def readinto(self, buf):
90 self._reads += 1
91 max_len = len(buf)
92 try:
93 data = self._read_stack[0]
94 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000096 return 0
97 if data is None:
98 del self._read_stack[0]
99 return None
100 n = len(data)
101 if len(data) <= max_len:
102 del self._read_stack[0]
103 buf[:n] = data
104 return n
105 else:
106 buf[:] = data[:max_len]
107 self._read_stack[0] = data[max_len:]
108 return max_len
109
110 def truncate(self, pos=None):
111 return pos
112
Antoine Pitrou328ec742010-09-14 18:37:24 +0000113class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
114 pass
115
116class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
117 pass
118
119
120class MockRawIO(MockRawIOWithoutRead):
121
122 def read(self, n=None):
123 self._reads += 1
124 try:
125 return self._read_stack.pop(0)
126 except:
127 self._extraneous_reads += 1
128 return b""
129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130class CMockRawIO(MockRawIO, io.RawIOBase):
131 pass
132
133class PyMockRawIO(MockRawIO, pyio.RawIOBase):
134 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000135
Guido van Rossuma9e20242007-03-08 00:43:48 +0000136
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000137class MisbehavedRawIO(MockRawIO):
138 def write(self, b):
139 return super().write(b) * 2
140
141 def read(self, n=None):
142 return super().read(n) * 2
143
144 def seek(self, pos, whence):
145 return -123
146
147 def tell(self):
148 return -456
149
150 def readinto(self, buf):
151 super().readinto(buf)
152 return len(buf) * 5
153
154class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
155 pass
156
157class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
158 pass
159
160
161class CloseFailureIO(MockRawIO):
162 closed = 0
163
164 def close(self):
165 if not self.closed:
166 self.closed = 1
167 raise IOError
168
169class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
170 pass
171
172class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
173 pass
174
175
176class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def __init__(self, data):
179 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000181
182 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000184 self.read_history.append(None if res is None else len(res))
185 return res
186
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000187 def readinto(self, b):
188 res = super().readinto(b)
189 self.read_history.append(res)
190 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000192class CMockFileIO(MockFileIO, io.BytesIO):
193 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000195class PyMockFileIO(MockFileIO, pyio.BytesIO):
196 pass
197
198
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000199class MockUnseekableIO:
200 def seekable(self):
201 return False
202
203 def seek(self, *args):
204 raise self.UnsupportedOperation("not seekable")
205
206 def tell(self, *args):
207 raise self.UnsupportedOperation("not seekable")
208
209class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
210 UnsupportedOperation = io.UnsupportedOperation
211
212class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
213 UnsupportedOperation = pyio.UnsupportedOperation
214
215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216class MockNonBlockWriterIO:
217
218 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000219 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 def pop_written(self):
223 s = b"".join(self._write_stack)
224 self._write_stack[:] = []
225 return s
226
227 def block_on(self, char):
228 """Block when a given char is encountered."""
229 self._blocker_char = char
230
231 def readable(self):
232 return True
233
234 def seekable(self):
235 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossum01a27522007-03-07 01:00:12 +0000237 def writable(self):
238 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000240 def write(self, b):
241 b = bytes(b)
242 n = -1
243 if self._blocker_char:
244 try:
245 n = b.index(self._blocker_char)
246 except ValueError:
247 pass
248 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100249 if n > 0:
250 # write data up to the first blocker
251 self._write_stack.append(b[:n])
252 return n
253 else:
254 # cancel blocker and indicate would block
255 self._blocker_char = None
256 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000257 self._write_stack.append(b)
258 return len(b)
259
260class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
261 BlockingIOError = io.BlockingIOError
262
263class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
264 BlockingIOError = pyio.BlockingIOError
265
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum28524c72007-02-27 05:47:44 +0000267class IOTest(unittest.TestCase):
268
Neal Norwitze7789b12008-03-24 06:18:09 +0000269 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000271
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000272 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000273 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000274
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000277 f.truncate(0)
278 self.assertEqual(f.tell(), 5)
279 f.seek(0)
280
281 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000282 self.assertEqual(f.seek(0), 0)
283 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000284 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000285 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000286 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000287 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000289 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.seek(-1, 2), 13)
291 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000295 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000296
Guido van Rossum9b76da62007-04-11 01:09:03 +0000297 def read_ops(self, f, buffered=False):
298 data = f.read(5)
299 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.readinto(data), 5)
302 self.assertEqual(data, b" worl")
303 self.assertEqual(f.readinto(data), 2)
304 self.assertEqual(len(data), 5)
305 self.assertEqual(data[:2], b"d\n")
306 self.assertEqual(f.seek(0), 0)
307 self.assertEqual(f.read(20), b"hello world\n")
308 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000309 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000310 self.assertEqual(f.seek(-6, 2), 6)
311 self.assertEqual(f.read(5), b"world")
312 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000313 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000314 self.assertEqual(f.seek(-6, 1), 5)
315 self.assertEqual(f.read(5), b" worl")
316 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000317 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 if buffered:
319 f.seek(0)
320 self.assertEqual(f.read(), b"hello world\n")
321 f.seek(6)
322 self.assertEqual(f.read(), b"world\n")
323 self.assertEqual(f.read(), b"")
324
Guido van Rossum34d69e52007-04-10 20:08:41 +0000325 LARGE = 2**31
326
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 def large_file_ops(self, f):
328 assert f.readable()
329 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000330 self.assertEqual(f.seek(self.LARGE), self.LARGE)
331 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000333 self.assertEqual(f.tell(), self.LARGE + 3)
334 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000336 self.assertEqual(f.tell(), self.LARGE + 2)
337 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000339 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000340 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
341 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 self.assertEqual(f.read(2), b"x")
343
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000344 def test_invalid_operations(self):
345 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000346 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000347 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.read)
350 self.assertRaises(exc, fp.readline)
351 with self.open(support.TESTFN, "wb", buffering=0) as fp:
352 self.assertRaises(exc, fp.read)
353 self.assertRaises(exc, fp.readline)
354 with self.open(support.TESTFN, "rb", buffering=0) as fp:
355 self.assertRaises(exc, fp.write, b"blah")
356 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000357 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000358 self.assertRaises(exc, fp.write, b"blah")
359 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000360 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000361 self.assertRaises(exc, fp.write, "blah")
362 self.assertRaises(exc, fp.writelines, ["blah\n"])
363 # Non-zero seeking from current or end pos
364 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
365 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000366
Antoine Pitrou13348842012-01-29 18:36:34 +0100367 def test_open_handles_NUL_chars(self):
368 fn_with_NUL = 'foo\0bar'
369 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
370 self.assertRaises(TypeError, self.open, bytes(fn_with_NUL, 'ascii'), 'w')
371
Guido van Rossum28524c72007-02-27 05:47:44 +0000372 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000373 with self.open(support.TESTFN, "wb", buffering=0) as f:
374 self.assertEqual(f.readable(), False)
375 self.assertEqual(f.writable(), True)
376 self.assertEqual(f.seekable(), True)
377 self.write_ops(f)
378 with self.open(support.TESTFN, "rb", buffering=0) as f:
379 self.assertEqual(f.readable(), True)
380 self.assertEqual(f.writable(), False)
381 self.assertEqual(f.seekable(), True)
382 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000383
Guido van Rossum87429772007-04-10 21:06:59 +0000384 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000385 with self.open(support.TESTFN, "wb") as f:
386 self.assertEqual(f.readable(), False)
387 self.assertEqual(f.writable(), True)
388 self.assertEqual(f.seekable(), True)
389 self.write_ops(f)
390 with self.open(support.TESTFN, "rb") as f:
391 self.assertEqual(f.readable(), True)
392 self.assertEqual(f.writable(), False)
393 self.assertEqual(f.seekable(), True)
394 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000395
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000396 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000397 with self.open(support.TESTFN, "wb") as f:
398 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
399 with self.open(support.TESTFN, "rb") as f:
400 self.assertEqual(f.readline(), b"abc\n")
401 self.assertEqual(f.readline(10), b"def\n")
402 self.assertEqual(f.readline(2), b"xy")
403 self.assertEqual(f.readline(4), b"zzy\n")
404 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000405 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000406 self.assertRaises(TypeError, f.readline, 5.3)
407 with self.open(support.TESTFN, "r") as f:
408 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000409
Guido van Rossum28524c72007-02-27 05:47:44 +0000410 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000412 self.write_ops(f)
413 data = f.getvalue()
414 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000416 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000417
Guido van Rossum53807da2007-04-10 19:01:47 +0000418 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000419 # On Windows and Mac OSX this test comsumes large resources; It takes
420 # a long time to build the >2GB file and takes >2GB of disk space
421 # therefore the resource must be enabled to run this test.
422 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000423 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000424 print("\nTesting large file ops skipped on %s." % sys.platform,
425 file=sys.stderr)
426 print("It requires %d bytes and a long time." % self.LARGE,
427 file=sys.stderr)
428 print("Use 'regrtest.py -u largefile test_io' to run it.",
429 file=sys.stderr)
430 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000431 with self.open(support.TESTFN, "w+b", 0) as f:
432 self.large_file_ops(f)
433 with self.open(support.TESTFN, "w+b") as f:
434 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000435
436 def test_with_open(self):
437 for bufsize in (0, 1, 100):
438 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000439 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000440 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000441 self.assertEqual(f.closed, True)
442 f = None
443 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000444 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000445 1/0
446 except ZeroDivisionError:
447 self.assertEqual(f.closed, True)
448 else:
449 self.fail("1/0 didn't raise an exception")
450
Antoine Pitrou08838b62009-01-21 00:55:13 +0000451 # issue 5008
452 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000454 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000456 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000458 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000459 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000460 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000461
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def test_destructor(self):
463 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000465 def __del__(self):
466 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000467 try:
468 f = super().__del__
469 except AttributeError:
470 pass
471 else:
472 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def close(self):
474 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000475 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000476 def flush(self):
477 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000479 with support.check_warnings(('', ResourceWarning)):
480 f = MyFileIO(support.TESTFN, "wb")
481 f.write(b"xxx")
482 del f
483 support.gc_collect()
484 self.assertEqual(record, [1, 2, 3])
485 with self.open(support.TESTFN, "rb") as f:
486 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487
488 def _check_base_destructor(self, base):
489 record = []
490 class MyIO(base):
491 def __init__(self):
492 # This exercises the availability of attributes on object
493 # destruction.
494 # (in the C version, close() is called by the tp_dealloc
495 # function, not by __del__)
496 self.on_del = 1
497 self.on_close = 2
498 self.on_flush = 3
499 def __del__(self):
500 record.append(self.on_del)
501 try:
502 f = super().__del__
503 except AttributeError:
504 pass
505 else:
506 f()
507 def close(self):
508 record.append(self.on_close)
509 super().close()
510 def flush(self):
511 record.append(self.on_flush)
512 super().flush()
513 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000516 self.assertEqual(record, [1, 2, 3])
517
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 def test_IOBase_destructor(self):
519 self._check_base_destructor(self.IOBase)
520
521 def test_RawIOBase_destructor(self):
522 self._check_base_destructor(self.RawIOBase)
523
524 def test_BufferedIOBase_destructor(self):
525 self._check_base_destructor(self.BufferedIOBase)
526
527 def test_TextIOBase_destructor(self):
528 self._check_base_destructor(self.TextIOBase)
529
Guido van Rossum87429772007-04-10 21:06:59 +0000530 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000531 with self.open(support.TESTFN, "wb") as f:
532 f.write(b"xxx")
533 with self.open(support.TESTFN, "rb") as f:
534 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000535
Guido van Rossumd4103952007-04-12 05:44:49 +0000536 def test_array_writes(self):
537 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000538 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000539 with self.open(support.TESTFN, "wb", 0) as f:
540 self.assertEqual(f.write(a), n)
541 with self.open(support.TESTFN, "wb") as f:
542 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000543
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000544 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000546 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_read_closed(self):
549 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
552 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000553 self.assertEqual(file.read(), "egg\n")
554 file.seek(0)
555 file.close()
556 self.assertRaises(ValueError, file.read)
557
558 def test_no_closefd_with_filename(self):
559 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000560 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000561
562 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000564 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000565 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000566 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000567 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000568 self.assertEqual(file.buffer.raw.closefd, False)
569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000570 def test_garbage_collection(self):
571 # FileIO objects are collected, and collecting them flushes
572 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000573 with support.check_warnings(('', ResourceWarning)):
574 f = self.FileIO(support.TESTFN, "wb")
575 f.write(b"abcxxx")
576 f.f = f
577 wr = weakref.ref(f)
578 del f
579 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000582 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000583
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 def test_unbounded_file(self):
585 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
586 zero = "/dev/zero"
587 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000588 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000589 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000590 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000591 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000592 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000593 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000594 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000595 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000596 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000597 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000598 self.assertRaises(OverflowError, f.read)
599
Antoine Pitrou6be88762010-05-03 16:48:20 +0000600 def test_flush_error_on_close(self):
601 f = self.open(support.TESTFN, "wb", buffering=0)
602 def bad_flush():
603 raise IOError()
604 f.flush = bad_flush
605 self.assertRaises(IOError, f.close) # exception not swallowed
606
607 def test_multi_close(self):
608 f = self.open(support.TESTFN, "wb", buffering=0)
609 f.close()
610 f.close()
611 f.close()
612 self.assertRaises(ValueError, f.flush)
613
Antoine Pitrou328ec742010-09-14 18:37:24 +0000614 def test_RawIOBase_read(self):
615 # Exercise the default RawIOBase.read() implementation (which calls
616 # readinto() internally).
617 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
618 self.assertEqual(rawio.read(2), b"ab")
619 self.assertEqual(rawio.read(2), b"c")
620 self.assertEqual(rawio.read(2), b"d")
621 self.assertEqual(rawio.read(2), None)
622 self.assertEqual(rawio.read(2), b"ef")
623 self.assertEqual(rawio.read(2), b"g")
624 self.assertEqual(rawio.read(2), None)
625 self.assertEqual(rawio.read(2), b"")
626
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400627 def test_types_have_dict(self):
628 test = (
629 self.IOBase(),
630 self.RawIOBase(),
631 self.TextIOBase(),
632 self.StringIO(),
633 self.BytesIO()
634 )
635 for obj in test:
636 self.assertTrue(hasattr(obj, "__dict__"))
637
Ross Lagerwall59142db2011-10-31 20:34:46 +0200638 def test_opener(self):
639 with self.open(support.TESTFN, "w") as f:
640 f.write("egg\n")
641 fd = os.open(support.TESTFN, os.O_RDONLY)
642 def opener(path, flags):
643 return fd
644 with self.open("non-existent", "r", opener=opener) as f:
645 self.assertEqual(f.read(), "egg\n")
646
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200647 def test_fileio_closefd(self):
648 # Issue #4841
649 with self.open(__file__, 'rb') as f1, \
650 self.open(__file__, 'rb') as f2:
651 fileio = self.FileIO(f1.fileno(), closefd=False)
652 # .__init__() must not close f1
653 fileio.__init__(f2.fileno(), closefd=False)
654 f1.readline()
655 # .close() must not close f2
656 fileio.close()
657 f2.readline()
658
659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200661
662 def test_IOBase_finalize(self):
663 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
664 # class which inherits IOBase and an object of this class are caught
665 # in a reference cycle and close() is already in the method cache.
666 class MyIO(self.IOBase):
667 def close(self):
668 pass
669
670 # create an instance to populate the method cache
671 MyIO()
672 obj = MyIO()
673 obj.obj = obj
674 wr = weakref.ref(obj)
675 del MyIO
676 del obj
677 support.gc_collect()
678 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680class PyIOTest(IOTest):
681 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000682
Guido van Rossuma9e20242007-03-08 00:43:48 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684class CommonBufferedTests:
685 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
686
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000687 def test_detach(self):
688 raw = self.MockRawIO()
689 buf = self.tp(raw)
690 self.assertIs(buf.detach(), raw)
691 self.assertRaises(ValueError, buf.detach)
692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 def test_fileno(self):
694 rawio = self.MockRawIO()
695 bufio = self.tp(rawio)
696
Ezio Melottib3aedd42010-11-20 19:04:17 +0000697 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698
699 def test_no_fileno(self):
700 # XXX will we always have fileno() function? If so, kill
701 # this test. Else, write it.
702 pass
703
704 def test_invalid_args(self):
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 # Invalid whence
708 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +0200709 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000710
711 def test_override_destructor(self):
712 tp = self.tp
713 record = []
714 class MyBufferedIO(tp):
715 def __del__(self):
716 record.append(1)
717 try:
718 f = super().__del__
719 except AttributeError:
720 pass
721 else:
722 f()
723 def close(self):
724 record.append(2)
725 super().close()
726 def flush(self):
727 record.append(3)
728 super().flush()
729 rawio = self.MockRawIO()
730 bufio = MyBufferedIO(rawio)
731 writable = bufio.writable()
732 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000733 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 if writable:
735 self.assertEqual(record, [1, 2, 3])
736 else:
737 self.assertEqual(record, [1, 2])
738
739 def test_context_manager(self):
740 # Test usability as a context manager
741 rawio = self.MockRawIO()
742 bufio = self.tp(rawio)
743 def _with():
744 with bufio:
745 pass
746 _with()
747 # bufio should now be closed, and using it a second time should raise
748 # a ValueError.
749 self.assertRaises(ValueError, _with)
750
751 def test_error_through_destructor(self):
752 # Test that the exception state is not modified by a destructor,
753 # even if close() fails.
754 rawio = self.CloseFailureIO()
755 def f():
756 self.tp(rawio).xyzzy
757 with support.captured_output("stderr") as s:
758 self.assertRaises(AttributeError, f)
759 s = s.getvalue().strip()
760 if s:
761 # The destructor *may* have printed an unraisable error, check it
762 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000763 self.assertTrue(s.startswith("Exception IOError: "), s)
764 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000765
Antoine Pitrou716c4442009-05-23 19:04:03 +0000766 def test_repr(self):
767 raw = self.MockRawIO()
768 b = self.tp(raw)
769 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
770 self.assertEqual(repr(b), "<%s>" % clsname)
771 raw.name = "dummy"
772 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
773 raw.name = b"dummy"
774 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
775
Antoine Pitrou6be88762010-05-03 16:48:20 +0000776 def test_flush_error_on_close(self):
777 raw = self.MockRawIO()
778 def bad_flush():
779 raise IOError()
780 raw.flush = bad_flush
781 b = self.tp(raw)
782 self.assertRaises(IOError, b.close) # exception not swallowed
783
784 def test_multi_close(self):
785 raw = self.MockRawIO()
786 b = self.tp(raw)
787 b.close()
788 b.close()
789 b.close()
790 self.assertRaises(ValueError, b.flush)
791
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000792 def test_unseekable(self):
793 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
794 self.assertRaises(self.UnsupportedOperation, bufio.tell)
795 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
796
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000797 def test_readonly_attributes(self):
798 raw = self.MockRawIO()
799 buf = self.tp(raw)
800 x = self.MockRawIO()
801 with self.assertRaises(AttributeError):
802 buf.raw = x
803
Guido van Rossum78892e42007-04-06 17:31:18 +0000804
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000805class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
806 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000807
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 def test_constructor(self):
809 rawio = self.MockRawIO([b"abc"])
810 bufio = self.tp(rawio)
811 bufio.__init__(rawio)
812 bufio.__init__(rawio, buffer_size=1024)
813 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000814 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000815 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
816 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
817 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
818 rawio = self.MockRawIO([b"abc"])
819 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000820 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000822 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000823 for arg in (None, 7):
824 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
825 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000826 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 # Invalid args
828 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000830 def test_read1(self):
831 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
832 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000833 self.assertEqual(b"a", bufio.read(1))
834 self.assertEqual(b"b", bufio.read1(1))
835 self.assertEqual(rawio._reads, 1)
836 self.assertEqual(b"c", bufio.read1(100))
837 self.assertEqual(rawio._reads, 1)
838 self.assertEqual(b"d", bufio.read1(100))
839 self.assertEqual(rawio._reads, 2)
840 self.assertEqual(b"efg", bufio.read1(100))
841 self.assertEqual(rawio._reads, 3)
842 self.assertEqual(b"", bufio.read1(100))
843 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 # Invalid args
845 self.assertRaises(ValueError, bufio.read1, -1)
846
847 def test_readinto(self):
848 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
849 bufio = self.tp(rawio)
850 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000851 self.assertEqual(bufio.readinto(b), 2)
852 self.assertEqual(b, b"ab")
853 self.assertEqual(bufio.readinto(b), 2)
854 self.assertEqual(b, b"cd")
855 self.assertEqual(bufio.readinto(b), 2)
856 self.assertEqual(b, b"ef")
857 self.assertEqual(bufio.readinto(b), 1)
858 self.assertEqual(b, b"gf")
859 self.assertEqual(bufio.readinto(b), 0)
860 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200861 rawio = self.MockRawIO((b"abc", None))
862 bufio = self.tp(rawio)
863 self.assertEqual(bufio.readinto(b), 2)
864 self.assertEqual(b, b"ab")
865 self.assertEqual(bufio.readinto(b), 1)
866 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000868 def test_readlines(self):
869 def bufio():
870 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
871 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000872 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
873 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
874 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000877 data = b"abcdefghi"
878 dlen = len(data)
879
880 tests = [
881 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
882 [ 100, [ 3, 3, 3], [ dlen ] ],
883 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
884 ]
885
886 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000887 rawio = self.MockFileIO(data)
888 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000889 pos = 0
890 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000891 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000892 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000893 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000894 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000897 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
899 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000900 self.assertEqual(b"abcd", bufio.read(6))
901 self.assertEqual(b"e", bufio.read(1))
902 self.assertEqual(b"fg", bufio.read())
903 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200904 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000905 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000906
Victor Stinnera80987f2011-05-25 22:47:16 +0200907 rawio = self.MockRawIO((b"a", None, None))
908 self.assertEqual(b"a", rawio.readall())
909 self.assertIsNone(rawio.readall())
910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911 def test_read_past_eof(self):
912 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
913 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000914
Ezio Melottib3aedd42010-11-20 19:04:17 +0000915 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000917 def test_read_all(self):
918 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
919 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000920
Ezio Melottib3aedd42010-11-20 19:04:17 +0000921 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000922
Victor Stinner45df8202010-04-28 22:31:17 +0000923 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000924 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000926 try:
927 # Write out many bytes with exactly the same number of 0's,
928 # 1's... 255's. This will help us check that concurrent reading
929 # doesn't duplicate or forget contents.
930 N = 1000
931 l = list(range(256)) * N
932 random.shuffle(l)
933 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000934 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000935 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000936 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000937 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000938 errors = []
939 results = []
940 def f():
941 try:
942 # Intra-buffer read then buffer-flushing read
943 for n in cycle([1, 19]):
944 s = bufio.read(n)
945 if not s:
946 break
947 # list.append() is atomic
948 results.append(s)
949 except Exception as e:
950 errors.append(e)
951 raise
952 threads = [threading.Thread(target=f) for x in range(20)]
953 for t in threads:
954 t.start()
955 time.sleep(0.02) # yield
956 for t in threads:
957 t.join()
958 self.assertFalse(errors,
959 "the following exceptions were caught: %r" % errors)
960 s = b''.join(results)
961 for i in range(256):
962 c = bytes(bytearray([i]))
963 self.assertEqual(s.count(c), N)
964 finally:
965 support.unlink(support.TESTFN)
966
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200967 def test_unseekable(self):
968 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
969 self.assertRaises(self.UnsupportedOperation, bufio.tell)
970 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
971 bufio.read(1)
972 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
973 self.assertRaises(self.UnsupportedOperation, bufio.tell)
974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 def test_misbehaved_io(self):
976 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
977 bufio = self.tp(rawio)
978 self.assertRaises(IOError, bufio.seek, 0)
979 self.assertRaises(IOError, bufio.tell)
980
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000981 def test_no_extraneous_read(self):
982 # Issue #9550; when the raw IO object has satisfied the read request,
983 # we should not issue any additional reads, otherwise it may block
984 # (e.g. socket).
985 bufsize = 16
986 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
987 rawio = self.MockRawIO([b"x" * n])
988 bufio = self.tp(rawio, bufsize)
989 self.assertEqual(bufio.read(n), b"x" * n)
990 # Simple case: one raw read is enough to satisfy the request.
991 self.assertEqual(rawio._extraneous_reads, 0,
992 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
993 # A more complex case where two raw reads are needed to satisfy
994 # the request.
995 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
996 bufio = self.tp(rawio, bufsize)
997 self.assertEqual(bufio.read(n), b"x" * n)
998 self.assertEqual(rawio._extraneous_reads, 0,
999 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1000
1001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002class CBufferedReaderTest(BufferedReaderTest):
1003 tp = io.BufferedReader
1004
1005 def test_constructor(self):
1006 BufferedReaderTest.test_constructor(self)
1007 # The allocation can succeed on 32-bit builds, e.g. with more
1008 # than 2GB RAM and a 64-bit kernel.
1009 if sys.maxsize > 0x7FFFFFFF:
1010 rawio = self.MockRawIO()
1011 bufio = self.tp(rawio)
1012 self.assertRaises((OverflowError, MemoryError, ValueError),
1013 bufio.__init__, rawio, sys.maxsize)
1014
1015 def test_initialization(self):
1016 rawio = self.MockRawIO([b"abc"])
1017 bufio = self.tp(rawio)
1018 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1019 self.assertRaises(ValueError, bufio.read)
1020 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1021 self.assertRaises(ValueError, bufio.read)
1022 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1023 self.assertRaises(ValueError, bufio.read)
1024
1025 def test_misbehaved_io_read(self):
1026 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1027 bufio = self.tp(rawio)
1028 # _pyio.BufferedReader seems to implement reading different, so that
1029 # checking this is not so easy.
1030 self.assertRaises(IOError, bufio.read, 10)
1031
1032 def test_garbage_collection(self):
1033 # C BufferedReader objects are collected.
1034 # The Python version has __del__, so it ends into gc.garbage instead
1035 rawio = self.FileIO(support.TESTFN, "w+b")
1036 f = self.tp(rawio)
1037 f.f = f
1038 wr = weakref.ref(f)
1039 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001040 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001041 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042
1043class PyBufferedReaderTest(BufferedReaderTest):
1044 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001045
Guido van Rossuma9e20242007-03-08 00:43:48 +00001046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1048 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001050 def test_constructor(self):
1051 rawio = self.MockRawIO()
1052 bufio = self.tp(rawio)
1053 bufio.__init__(rawio)
1054 bufio.__init__(rawio, buffer_size=1024)
1055 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 bufio.flush()
1058 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1059 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1060 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1061 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001062 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001066 def test_detach_flush(self):
1067 raw = self.MockRawIO()
1068 buf = self.tp(raw)
1069 buf.write(b"howdy!")
1070 self.assertFalse(raw._write_stack)
1071 buf.detach()
1072 self.assertEqual(raw._write_stack, [b"howdy!"])
1073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001075 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 writer = self.MockRawIO()
1077 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001078 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001079 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 def test_write_overflow(self):
1082 writer = self.MockRawIO()
1083 bufio = self.tp(writer, 8)
1084 contents = b"abcdefghijklmnop"
1085 for n in range(0, len(contents), 3):
1086 bufio.write(contents[n:n+3])
1087 flushed = b"".join(writer._write_stack)
1088 # At least (total - 8) bytes were implicitly flushed, perhaps more
1089 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001090 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 def check_writes(self, intermediate_func):
1093 # Lots of writes, test the flushed output is as expected.
1094 contents = bytes(range(256)) * 1000
1095 n = 0
1096 writer = self.MockRawIO()
1097 bufio = self.tp(writer, 13)
1098 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1099 def gen_sizes():
1100 for size in count(1):
1101 for i in range(15):
1102 yield size
1103 sizes = gen_sizes()
1104 while n < len(contents):
1105 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001106 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 intermediate_func(bufio)
1108 n += size
1109 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001110 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 def test_writes(self):
1113 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001115 def test_writes_and_flushes(self):
1116 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def test_writes_and_seeks(self):
1119 def _seekabs(bufio):
1120 pos = bufio.tell()
1121 bufio.seek(pos + 1, 0)
1122 bufio.seek(pos - 1, 0)
1123 bufio.seek(pos, 0)
1124 self.check_writes(_seekabs)
1125 def _seekrel(bufio):
1126 pos = bufio.seek(0, 1)
1127 bufio.seek(+1, 1)
1128 bufio.seek(-1, 1)
1129 bufio.seek(pos, 0)
1130 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 def test_writes_and_truncates(self):
1133 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def test_write_non_blocking(self):
1136 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001137 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001138
Ezio Melottib3aedd42010-11-20 19:04:17 +00001139 self.assertEqual(bufio.write(b"abcd"), 4)
1140 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001141 # 1 byte will be written, the rest will be buffered
1142 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001143 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001144
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001145 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1146 raw.block_on(b"0")
1147 try:
1148 bufio.write(b"opqrwxyz0123456789")
1149 except self.BlockingIOError as e:
1150 written = e.characters_written
1151 else:
1152 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001153 self.assertEqual(written, 16)
1154 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001156
Ezio Melottib3aedd42010-11-20 19:04:17 +00001157 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001158 s = raw.pop_written()
1159 # Previously buffered bytes were flushed
1160 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 def test_write_and_rewind(self):
1163 raw = io.BytesIO()
1164 bufio = self.tp(raw, 4)
1165 self.assertEqual(bufio.write(b"abcdef"), 6)
1166 self.assertEqual(bufio.tell(), 6)
1167 bufio.seek(0, 0)
1168 self.assertEqual(bufio.write(b"XY"), 2)
1169 bufio.seek(6, 0)
1170 self.assertEqual(raw.getvalue(), b"XYcdef")
1171 self.assertEqual(bufio.write(b"123456"), 6)
1172 bufio.flush()
1173 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001174
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 def test_flush(self):
1176 writer = self.MockRawIO()
1177 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001178 bufio.write(b"abc")
1179 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001180 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 def test_destructor(self):
1183 writer = self.MockRawIO()
1184 bufio = self.tp(writer, 8)
1185 bufio.write(b"abc")
1186 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001187 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001188 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001189
1190 def test_truncate(self):
1191 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001192 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193 bufio = self.tp(raw, 8)
1194 bufio.write(b"abcdef")
1195 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001196 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001197 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001198 self.assertEqual(f.read(), b"abc")
1199
Victor Stinner45df8202010-04-28 22:31:17 +00001200 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001201 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001202 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001203 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204 # Write out many bytes from many threads and test they were
1205 # all flushed.
1206 N = 1000
1207 contents = bytes(range(256)) * N
1208 sizes = cycle([1, 19])
1209 n = 0
1210 queue = deque()
1211 while n < len(contents):
1212 size = next(sizes)
1213 queue.append(contents[n:n+size])
1214 n += size
1215 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001216 # We use a real file object because it allows us to
1217 # exercise situations where the GIL is released before
1218 # writing the buffer to the raw streams. This is in addition
1219 # to concurrency issues due to switching threads in the middle
1220 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001221 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001222 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001223 errors = []
1224 def f():
1225 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226 while True:
1227 try:
1228 s = queue.popleft()
1229 except IndexError:
1230 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001231 bufio.write(s)
1232 except Exception as e:
1233 errors.append(e)
1234 raise
1235 threads = [threading.Thread(target=f) for x in range(20)]
1236 for t in threads:
1237 t.start()
1238 time.sleep(0.02) # yield
1239 for t in threads:
1240 t.join()
1241 self.assertFalse(errors,
1242 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001244 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 s = f.read()
1246 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001248 finally:
1249 support.unlink(support.TESTFN)
1250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251 def test_misbehaved_io(self):
1252 rawio = self.MisbehavedRawIO()
1253 bufio = self.tp(rawio, 5)
1254 self.assertRaises(IOError, bufio.seek, 0)
1255 self.assertRaises(IOError, bufio.tell)
1256 self.assertRaises(IOError, bufio.write, b"abcdef")
1257
Florent Xicluna109d5732012-07-07 17:03:22 +02001258 def test_max_buffer_size_removal(self):
1259 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001260 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001261
1262
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263class CBufferedWriterTest(BufferedWriterTest):
1264 tp = io.BufferedWriter
1265
1266 def test_constructor(self):
1267 BufferedWriterTest.test_constructor(self)
1268 # The allocation can succeed on 32-bit builds, e.g. with more
1269 # than 2GB RAM and a 64-bit kernel.
1270 if sys.maxsize > 0x7FFFFFFF:
1271 rawio = self.MockRawIO()
1272 bufio = self.tp(rawio)
1273 self.assertRaises((OverflowError, MemoryError, ValueError),
1274 bufio.__init__, rawio, sys.maxsize)
1275
1276 def test_initialization(self):
1277 rawio = self.MockRawIO()
1278 bufio = self.tp(rawio)
1279 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1280 self.assertRaises(ValueError, bufio.write, b"def")
1281 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1282 self.assertRaises(ValueError, bufio.write, b"def")
1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1284 self.assertRaises(ValueError, bufio.write, b"def")
1285
1286 def test_garbage_collection(self):
1287 # C BufferedWriter objects are collected, and collecting them flushes
1288 # all data to disk.
1289 # The Python version has __del__, so it ends into gc.garbage instead
1290 rawio = self.FileIO(support.TESTFN, "w+b")
1291 f = self.tp(rawio)
1292 f.write(b"123xxx")
1293 f.x = f
1294 wr = weakref.ref(f)
1295 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001296 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001297 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001298 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299 self.assertEqual(f.read(), b"123xxx")
1300
1301
1302class PyBufferedWriterTest(BufferedWriterTest):
1303 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001304
Guido van Rossum01a27522007-03-07 01:00:12 +00001305class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001306
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001307 def test_constructor(self):
1308 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001309 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001310
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001311 def test_detach(self):
1312 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1313 self.assertRaises(self.UnsupportedOperation, pair.detach)
1314
Florent Xicluna109d5732012-07-07 17:03:22 +02001315 def test_constructor_max_buffer_size_removal(self):
1316 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001317 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001318
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001319 def test_constructor_with_not_readable(self):
1320 class NotReadable(MockRawIO):
1321 def readable(self):
1322 return False
1323
1324 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1325
1326 def test_constructor_with_not_writeable(self):
1327 class NotWriteable(MockRawIO):
1328 def writable(self):
1329 return False
1330
1331 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1332
1333 def test_read(self):
1334 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1335
1336 self.assertEqual(pair.read(3), b"abc")
1337 self.assertEqual(pair.read(1), b"d")
1338 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001339 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1340 self.assertEqual(pair.read(None), b"abc")
1341
1342 def test_readlines(self):
1343 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1344 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1345 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1346 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001347
1348 def test_read1(self):
1349 # .read1() is delegated to the underlying reader object, so this test
1350 # can be shallow.
1351 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1352
1353 self.assertEqual(pair.read1(3), b"abc")
1354
1355 def test_readinto(self):
1356 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1357
1358 data = bytearray(5)
1359 self.assertEqual(pair.readinto(data), 5)
1360 self.assertEqual(data, b"abcde")
1361
1362 def test_write(self):
1363 w = self.MockRawIO()
1364 pair = self.tp(self.MockRawIO(), w)
1365
1366 pair.write(b"abc")
1367 pair.flush()
1368 pair.write(b"def")
1369 pair.flush()
1370 self.assertEqual(w._write_stack, [b"abc", b"def"])
1371
1372 def test_peek(self):
1373 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1374
1375 self.assertTrue(pair.peek(3).startswith(b"abc"))
1376 self.assertEqual(pair.read(3), b"abc")
1377
1378 def test_readable(self):
1379 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1380 self.assertTrue(pair.readable())
1381
1382 def test_writeable(self):
1383 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1384 self.assertTrue(pair.writable())
1385
1386 def test_seekable(self):
1387 # BufferedRWPairs are never seekable, even if their readers and writers
1388 # are.
1389 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1390 self.assertFalse(pair.seekable())
1391
1392 # .flush() is delegated to the underlying writer object and has been
1393 # tested in the test_write method.
1394
1395 def test_close_and_closed(self):
1396 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1397 self.assertFalse(pair.closed)
1398 pair.close()
1399 self.assertTrue(pair.closed)
1400
1401 def test_isatty(self):
1402 class SelectableIsAtty(MockRawIO):
1403 def __init__(self, isatty):
1404 MockRawIO.__init__(self)
1405 self._isatty = isatty
1406
1407 def isatty(self):
1408 return self._isatty
1409
1410 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1411 self.assertFalse(pair.isatty())
1412
1413 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1414 self.assertTrue(pair.isatty())
1415
1416 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1417 self.assertTrue(pair.isatty())
1418
1419 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1420 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001421
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422class CBufferedRWPairTest(BufferedRWPairTest):
1423 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001424
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425class PyBufferedRWPairTest(BufferedRWPairTest):
1426 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001427
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428
1429class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1430 read_mode = "rb+"
1431 write_mode = "wb+"
1432
1433 def test_constructor(self):
1434 BufferedReaderTest.test_constructor(self)
1435 BufferedWriterTest.test_constructor(self)
1436
1437 def test_read_and_write(self):
1438 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001439 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001440
1441 self.assertEqual(b"as", rw.read(2))
1442 rw.write(b"ddd")
1443 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001444 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001446 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_seek_and_tell(self):
1449 raw = self.BytesIO(b"asdfghjkl")
1450 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001451
Ezio Melottib3aedd42010-11-20 19:04:17 +00001452 self.assertEqual(b"as", rw.read(2))
1453 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001454 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001455 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001456
Antoine Pitroue05565e2011-08-20 14:39:23 +02001457 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001458 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001459 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001460 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001461 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001462 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001463 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001464 self.assertEqual(7, rw.tell())
1465 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001466 rw.flush()
1467 self.assertEqual(b"asdf123fl", raw.getvalue())
1468
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001469 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 def check_flush_and_read(self, read_func):
1472 raw = self.BytesIO(b"abcdefghi")
1473 bufio = self.tp(raw)
1474
Ezio Melottib3aedd42010-11-20 19:04:17 +00001475 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001476 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001477 self.assertEqual(b"ef", read_func(bufio, 2))
1478 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001480 self.assertEqual(6, bufio.tell())
1481 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001482 raw.seek(0, 0)
1483 raw.write(b"XYZ")
1484 # flush() resets the read buffer
1485 bufio.flush()
1486 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001487 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001488
1489 def test_flush_and_read(self):
1490 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1491
1492 def test_flush_and_readinto(self):
1493 def _readinto(bufio, n=-1):
1494 b = bytearray(n if n >= 0 else 9999)
1495 n = bufio.readinto(b)
1496 return bytes(b[:n])
1497 self.check_flush_and_read(_readinto)
1498
1499 def test_flush_and_peek(self):
1500 def _peek(bufio, n=-1):
1501 # This relies on the fact that the buffer can contain the whole
1502 # raw stream, otherwise peek() can return less.
1503 b = bufio.peek(n)
1504 if n != -1:
1505 b = b[:n]
1506 bufio.seek(len(b), 1)
1507 return b
1508 self.check_flush_and_read(_peek)
1509
1510 def test_flush_and_write(self):
1511 raw = self.BytesIO(b"abcdefghi")
1512 bufio = self.tp(raw)
1513
1514 bufio.write(b"123")
1515 bufio.flush()
1516 bufio.write(b"45")
1517 bufio.flush()
1518 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001519 self.assertEqual(b"12345fghi", raw.getvalue())
1520 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521
1522 def test_threads(self):
1523 BufferedReaderTest.test_threads(self)
1524 BufferedWriterTest.test_threads(self)
1525
1526 def test_writes_and_peek(self):
1527 def _peek(bufio):
1528 bufio.peek(1)
1529 self.check_writes(_peek)
1530 def _peek(bufio):
1531 pos = bufio.tell()
1532 bufio.seek(-1, 1)
1533 bufio.peek(1)
1534 bufio.seek(pos, 0)
1535 self.check_writes(_peek)
1536
1537 def test_writes_and_reads(self):
1538 def _read(bufio):
1539 bufio.seek(-1, 1)
1540 bufio.read(1)
1541 self.check_writes(_read)
1542
1543 def test_writes_and_read1s(self):
1544 def _read1(bufio):
1545 bufio.seek(-1, 1)
1546 bufio.read1(1)
1547 self.check_writes(_read1)
1548
1549 def test_writes_and_readintos(self):
1550 def _read(bufio):
1551 bufio.seek(-1, 1)
1552 bufio.readinto(bytearray(1))
1553 self.check_writes(_read)
1554
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001555 def test_write_after_readahead(self):
1556 # Issue #6629: writing after the buffer was filled by readahead should
1557 # first rewind the raw stream.
1558 for overwrite_size in [1, 5]:
1559 raw = self.BytesIO(b"A" * 10)
1560 bufio = self.tp(raw, 4)
1561 # Trigger readahead
1562 self.assertEqual(bufio.read(1), b"A")
1563 self.assertEqual(bufio.tell(), 1)
1564 # Overwriting should rewind the raw stream if it needs so
1565 bufio.write(b"B" * overwrite_size)
1566 self.assertEqual(bufio.tell(), overwrite_size + 1)
1567 # If the write size was smaller than the buffer size, flush() and
1568 # check that rewind happens.
1569 bufio.flush()
1570 self.assertEqual(bufio.tell(), overwrite_size + 1)
1571 s = raw.getvalue()
1572 self.assertEqual(s,
1573 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1574
Antoine Pitrou7c404892011-05-13 00:13:33 +02001575 def test_write_rewind_write(self):
1576 # Various combinations of reading / writing / seeking backwards / writing again
1577 def mutate(bufio, pos1, pos2):
1578 assert pos2 >= pos1
1579 # Fill the buffer
1580 bufio.seek(pos1)
1581 bufio.read(pos2 - pos1)
1582 bufio.write(b'\x02')
1583 # This writes earlier than the previous write, but still inside
1584 # the buffer.
1585 bufio.seek(pos1)
1586 bufio.write(b'\x01')
1587
1588 b = b"\x80\x81\x82\x83\x84"
1589 for i in range(0, len(b)):
1590 for j in range(i, len(b)):
1591 raw = self.BytesIO(b)
1592 bufio = self.tp(raw, 100)
1593 mutate(bufio, i, j)
1594 bufio.flush()
1595 expected = bytearray(b)
1596 expected[j] = 2
1597 expected[i] = 1
1598 self.assertEqual(raw.getvalue(), expected,
1599 "failed result for i=%d, j=%d" % (i, j))
1600
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001601 def test_truncate_after_read_or_write(self):
1602 raw = self.BytesIO(b"A" * 10)
1603 bufio = self.tp(raw, 100)
1604 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1605 self.assertEqual(bufio.truncate(), 2)
1606 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1607 self.assertEqual(bufio.truncate(), 4)
1608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def test_misbehaved_io(self):
1610 BufferedReaderTest.test_misbehaved_io(self)
1611 BufferedWriterTest.test_misbehaved_io(self)
1612
Antoine Pitroue05565e2011-08-20 14:39:23 +02001613 def test_interleaved_read_write(self):
1614 # Test for issue #12213
1615 with self.BytesIO(b'abcdefgh') as raw:
1616 with self.tp(raw, 100) as f:
1617 f.write(b"1")
1618 self.assertEqual(f.read(1), b'b')
1619 f.write(b'2')
1620 self.assertEqual(f.read1(1), b'd')
1621 f.write(b'3')
1622 buf = bytearray(1)
1623 f.readinto(buf)
1624 self.assertEqual(buf, b'f')
1625 f.write(b'4')
1626 self.assertEqual(f.peek(1), b'h')
1627 f.flush()
1628 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1629
1630 with self.BytesIO(b'abc') as raw:
1631 with self.tp(raw, 100) as f:
1632 self.assertEqual(f.read(1), b'a')
1633 f.write(b"2")
1634 self.assertEqual(f.read(1), b'c')
1635 f.flush()
1636 self.assertEqual(raw.getvalue(), b'a2c')
1637
1638 def test_interleaved_readline_write(self):
1639 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1640 with self.tp(raw) as f:
1641 f.write(b'1')
1642 self.assertEqual(f.readline(), b'b\n')
1643 f.write(b'2')
1644 self.assertEqual(f.readline(), b'def\n')
1645 f.write(b'3')
1646 self.assertEqual(f.readline(), b'\n')
1647 f.flush()
1648 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1649
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001650 # You can't construct a BufferedRandom over a non-seekable stream.
1651 test_unseekable = None
1652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653class CBufferedRandomTest(BufferedRandomTest):
1654 tp = io.BufferedRandom
1655
1656 def test_constructor(self):
1657 BufferedRandomTest.test_constructor(self)
1658 # The allocation can succeed on 32-bit builds, e.g. with more
1659 # than 2GB RAM and a 64-bit kernel.
1660 if sys.maxsize > 0x7FFFFFFF:
1661 rawio = self.MockRawIO()
1662 bufio = self.tp(rawio)
1663 self.assertRaises((OverflowError, MemoryError, ValueError),
1664 bufio.__init__, rawio, sys.maxsize)
1665
1666 def test_garbage_collection(self):
1667 CBufferedReaderTest.test_garbage_collection(self)
1668 CBufferedWriterTest.test_garbage_collection(self)
1669
1670class PyBufferedRandomTest(BufferedRandomTest):
1671 tp = pyio.BufferedRandom
1672
1673
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001674# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1675# properties:
1676# - A single output character can correspond to many bytes of input.
1677# - The number of input bytes to complete the character can be
1678# undetermined until the last input byte is received.
1679# - The number of input bytes can vary depending on previous input.
1680# - A single input byte can correspond to many characters of output.
1681# - The number of output characters can be undetermined until the
1682# last input byte is received.
1683# - The number of output characters can vary depending on previous input.
1684
1685class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1686 """
1687 For testing seek/tell behavior with a stateful, buffering decoder.
1688
1689 Input is a sequence of words. Words may be fixed-length (length set
1690 by input) or variable-length (period-terminated). In variable-length
1691 mode, extra periods are ignored. Possible words are:
1692 - 'i' followed by a number sets the input length, I (maximum 99).
1693 When I is set to 0, words are space-terminated.
1694 - 'o' followed by a number sets the output length, O (maximum 99).
1695 - Any other word is converted into a word followed by a period on
1696 the output. The output word consists of the input word truncated
1697 or padded out with hyphens to make its length equal to O. If O
1698 is 0, the word is output verbatim without truncating or padding.
1699 I and O are initially set to 1. When I changes, any buffered input is
1700 re-scanned according to the new I. EOF also terminates the last word.
1701 """
1702
1703 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001704 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001705 self.reset()
1706
1707 def __repr__(self):
1708 return '<SID %x>' % id(self)
1709
1710 def reset(self):
1711 self.i = 1
1712 self.o = 1
1713 self.buffer = bytearray()
1714
1715 def getstate(self):
1716 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1717 return bytes(self.buffer), i*100 + o
1718
1719 def setstate(self, state):
1720 buffer, io = state
1721 self.buffer = bytearray(buffer)
1722 i, o = divmod(io, 100)
1723 self.i, self.o = i ^ 1, o ^ 1
1724
1725 def decode(self, input, final=False):
1726 output = ''
1727 for b in input:
1728 if self.i == 0: # variable-length, terminated with period
1729 if b == ord('.'):
1730 if self.buffer:
1731 output += self.process_word()
1732 else:
1733 self.buffer.append(b)
1734 else: # fixed-length, terminate after self.i bytes
1735 self.buffer.append(b)
1736 if len(self.buffer) == self.i:
1737 output += self.process_word()
1738 if final and self.buffer: # EOF terminates the last word
1739 output += self.process_word()
1740 return output
1741
1742 def process_word(self):
1743 output = ''
1744 if self.buffer[0] == ord('i'):
1745 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1746 elif self.buffer[0] == ord('o'):
1747 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1748 else:
1749 output = self.buffer.decode('ascii')
1750 if len(output) < self.o:
1751 output += '-'*self.o # pad out with hyphens
1752 if self.o:
1753 output = output[:self.o] # truncate to output length
1754 output += '.'
1755 self.buffer = bytearray()
1756 return output
1757
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001758 codecEnabled = False
1759
1760 @classmethod
1761 def lookupTestDecoder(cls, name):
1762 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001763 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001764 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001765 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001766 incrementalencoder=None,
1767 streamreader=None, streamwriter=None,
1768 incrementaldecoder=cls)
1769
1770# Register the previous decoder for testing.
1771# Disabled by default, tests will enable it.
1772codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1773
1774
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001775class StatefulIncrementalDecoderTest(unittest.TestCase):
1776 """
1777 Make sure the StatefulIncrementalDecoder actually works.
1778 """
1779
1780 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001781 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001782 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001783 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001784 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001785 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001786 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001787 # I=0, O=6 (variable-length input, fixed-length output)
1788 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1789 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001790 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001791 # I=6, O=3 (fixed-length input > fixed-length output)
1792 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1793 # I=0, then 3; O=29, then 15 (with longer output)
1794 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1795 'a----------------------------.' +
1796 'b----------------------------.' +
1797 'cde--------------------------.' +
1798 'abcdefghijabcde.' +
1799 'a.b------------.' +
1800 '.c.------------.' +
1801 'd.e------------.' +
1802 'k--------------.' +
1803 'l--------------.' +
1804 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001805 ]
1806
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001808 # Try a few one-shot test cases.
1809 for input, eof, output in self.test_cases:
1810 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001811 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001812
1813 # Also test an unfinished decode, followed by forcing EOF.
1814 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001815 self.assertEqual(d.decode(b'oiabcd'), '')
1816 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001817
1818class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001819
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001820 def setUp(self):
1821 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1822 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001823 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001824
Guido van Rossumd0712812007-04-11 16:32:43 +00001825 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001826 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 def test_constructor(self):
1829 r = self.BytesIO(b"\xc3\xa9\n\n")
1830 b = self.BufferedReader(r, 1000)
1831 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001832 t.__init__(b, encoding="latin-1", newline="\r\n")
1833 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001834 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001835 t.__init__(b, encoding="utf-8", line_buffering=True)
1836 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001837 self.assertEqual(t.line_buffering, True)
1838 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 self.assertRaises(TypeError, t.__init__, b, newline=42)
1840 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1841
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001842 def test_detach(self):
1843 r = self.BytesIO()
1844 b = self.BufferedWriter(r)
1845 t = self.TextIOWrapper(b)
1846 self.assertIs(t.detach(), b)
1847
1848 t = self.TextIOWrapper(b, encoding="ascii")
1849 t.write("howdy")
1850 self.assertFalse(r.getvalue())
1851 t.detach()
1852 self.assertEqual(r.getvalue(), b"howdy")
1853 self.assertRaises(ValueError, t.detach)
1854
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001855 def test_repr(self):
1856 raw = self.BytesIO("hello".encode("utf-8"))
1857 b = self.BufferedReader(raw)
1858 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001859 modname = self.TextIOWrapper.__module__
1860 self.assertEqual(repr(t),
1861 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1862 raw.name = "dummy"
1863 self.assertEqual(repr(t),
1864 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001865 t.mode = "r"
1866 self.assertEqual(repr(t),
1867 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001868 raw.name = b"dummy"
1869 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001870 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 def test_line_buffering(self):
1873 r = self.BytesIO()
1874 b = self.BufferedWriter(r, 1000)
1875 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001876 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001877 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001878 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001880 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001881 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001882
Victor Stinnerf86a5e82012-06-05 13:43:22 +02001883 def test_default_encoding(self):
1884 old_environ = dict(os.environ)
1885 try:
1886 # try to get a user preferred encoding different than the current
1887 # locale encoding to check that TextIOWrapper() uses the current
1888 # locale encoding and not the user preferred encoding
1889 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
1890 if key in os.environ:
1891 del os.environ[key]
1892
1893 current_locale_encoding = locale.getpreferredencoding(False)
1894 b = self.BytesIO()
1895 t = self.TextIOWrapper(b)
1896 self.assertEqual(t.encoding, current_locale_encoding)
1897 finally:
1898 os.environ.clear()
1899 os.environ.update(old_environ)
1900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001901 def test_encoding(self):
1902 # Check the encoding attribute is always set, and valid
1903 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001904 t = self.TextIOWrapper(b, encoding="utf-8")
1905 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001907 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001908 codecs.lookup(t.encoding)
1909
1910 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001911 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 b = self.BytesIO(b"abc\n\xff\n")
1913 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001914 self.assertRaises(UnicodeError, t.read)
1915 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 b = self.BytesIO(b"abc\n\xff\n")
1917 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001918 self.assertRaises(UnicodeError, t.read)
1919 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 b = self.BytesIO(b"abc\n\xff\n")
1921 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001922 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001923 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 b = self.BytesIO(b"abc\n\xff\n")
1925 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001926 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001929 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001930 b = self.BytesIO()
1931 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001932 self.assertRaises(UnicodeError, t.write, "\xff")
1933 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 b = self.BytesIO()
1935 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001936 self.assertRaises(UnicodeError, t.write, "\xff")
1937 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001938 b = self.BytesIO()
1939 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001940 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001941 t.write("abc\xffdef\n")
1942 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001943 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001944 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 b = self.BytesIO()
1946 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001947 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001948 t.write("abc\xffdef\n")
1949 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001953 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1954
1955 tests = [
1956 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001957 [ '', input_lines ],
1958 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1959 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1960 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001961 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001962 encodings = (
1963 'utf-8', 'latin-1',
1964 'utf-16', 'utf-16-le', 'utf-16-be',
1965 'utf-32', 'utf-32-le', 'utf-32-be',
1966 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001967
Guido van Rossum8358db22007-08-18 21:39:55 +00001968 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001969 # character in TextIOWrapper._pending_line.
1970 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001971 # XXX: str.encode() should return bytes
1972 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001973 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001974 for bufsize in range(1, 10):
1975 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001976 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1977 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001978 encoding=encoding)
1979 if do_reads:
1980 got_lines = []
1981 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001982 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001983 if c2 == '':
1984 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001985 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001986 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001987 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001988 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001989
1990 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001991 self.assertEqual(got_line, exp_line)
1992 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001993
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 def test_newlines_input(self):
1995 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001996 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1997 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03001998 (None, normalized.decode("ascii").splitlines(keepends=True)),
1999 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2001 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2002 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002003 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002004 buf = self.BytesIO(testdata)
2005 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002006 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002007 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002008 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002010 def test_newlines_output(self):
2011 testdict = {
2012 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2013 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2014 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2015 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2016 }
2017 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2018 for newline, expected in tests:
2019 buf = self.BytesIO()
2020 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2021 txt.write("AAA\nB")
2022 txt.write("BB\nCCC\n")
2023 txt.write("X\rY\r\nZ")
2024 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002025 self.assertEqual(buf.closed, False)
2026 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027
2028 def test_destructor(self):
2029 l = []
2030 base = self.BytesIO
2031 class MyBytesIO(base):
2032 def close(self):
2033 l.append(self.getvalue())
2034 base.close(self)
2035 b = MyBytesIO()
2036 t = self.TextIOWrapper(b, encoding="ascii")
2037 t.write("abc")
2038 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002039 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002040 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002041
2042 def test_override_destructor(self):
2043 record = []
2044 class MyTextIO(self.TextIOWrapper):
2045 def __del__(self):
2046 record.append(1)
2047 try:
2048 f = super().__del__
2049 except AttributeError:
2050 pass
2051 else:
2052 f()
2053 def close(self):
2054 record.append(2)
2055 super().close()
2056 def flush(self):
2057 record.append(3)
2058 super().flush()
2059 b = self.BytesIO()
2060 t = MyTextIO(b, encoding="ascii")
2061 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002062 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002063 self.assertEqual(record, [1, 2, 3])
2064
2065 def test_error_through_destructor(self):
2066 # Test that the exception state is not modified by a destructor,
2067 # even if close() fails.
2068 rawio = self.CloseFailureIO()
2069 def f():
2070 self.TextIOWrapper(rawio).xyzzy
2071 with support.captured_output("stderr") as s:
2072 self.assertRaises(AttributeError, f)
2073 s = s.getvalue().strip()
2074 if s:
2075 # The destructor *may* have printed an unraisable error, check it
2076 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002077 self.assertTrue(s.startswith("Exception IOError: "), s)
2078 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002079
Guido van Rossum9b76da62007-04-11 01:09:03 +00002080 # Systematic tests of the text I/O API
2081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002082 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002083 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002084 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002086 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002087 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002088 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002089 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002090 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002091 self.assertEqual(f.tell(), 0)
2092 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002093 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002094 self.assertEqual(f.seek(0), 0)
2095 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002096 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002097 self.assertEqual(f.read(2), "ab")
2098 self.assertEqual(f.read(1), "c")
2099 self.assertEqual(f.read(1), "")
2100 self.assertEqual(f.read(), "")
2101 self.assertEqual(f.tell(), cookie)
2102 self.assertEqual(f.seek(0), 0)
2103 self.assertEqual(f.seek(0, 2), cookie)
2104 self.assertEqual(f.write("def"), 3)
2105 self.assertEqual(f.seek(cookie), cookie)
2106 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002107 if enc.startswith("utf"):
2108 self.multi_line_test(f, enc)
2109 f.close()
2110
2111 def multi_line_test(self, f, enc):
2112 f.seek(0)
2113 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002114 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002115 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002116 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002117 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002118 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002119 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002120 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002121 wlines.append((f.tell(), line))
2122 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002123 f.seek(0)
2124 rlines = []
2125 while True:
2126 pos = f.tell()
2127 line = f.readline()
2128 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002129 break
2130 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002134 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002135 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002136 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002137 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002138 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002139 p2 = f.tell()
2140 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002141 self.assertEqual(f.tell(), p0)
2142 self.assertEqual(f.readline(), "\xff\n")
2143 self.assertEqual(f.tell(), p1)
2144 self.assertEqual(f.readline(), "\xff\n")
2145 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002146 f.seek(0)
2147 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002148 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002149 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002150 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002151 f.close()
2152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153 def test_seeking(self):
2154 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002155 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002156 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002157 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002159 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002160 suffix = bytes(u_suffix.encode("utf-8"))
2161 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002162 with self.open(support.TESTFN, "wb") as f:
2163 f.write(line*2)
2164 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2165 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(s, str(prefix, "ascii"))
2167 self.assertEqual(f.tell(), prefix_size)
2168 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002171 # Regression test for a specific bug
2172 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002173 with self.open(support.TESTFN, "wb") as f:
2174 f.write(data)
2175 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2176 f._CHUNK_SIZE # Just test that it exists
2177 f._CHUNK_SIZE = 2
2178 f.readline()
2179 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181 def test_seek_and_tell(self):
2182 #Test seek/tell using the StatefulIncrementalDecoder.
2183 # Make test faster by doing smaller seeks
2184 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002185
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002186 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002187 """Tell/seek to various points within a data stream and ensure
2188 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002189 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002190 f.write(data)
2191 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002192 f = self.open(support.TESTFN, encoding='test_decoder')
2193 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002194 decoded = f.read()
2195 f.close()
2196
Neal Norwitze2b07052008-03-18 19:52:05 +00002197 for i in range(min_pos, len(decoded) + 1): # seek positions
2198 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002200 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002201 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002203 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002205 f.close()
2206
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002207 # Enable the test decoder.
2208 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002209
2210 # Run the tests.
2211 try:
2212 # Try each test case.
2213 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002214 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002215
2216 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002217 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2218 offset = CHUNK_SIZE - len(input)//2
2219 prefix = b'.'*offset
2220 # Don't bother seeking into the prefix (takes too long).
2221 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002222 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002223
2224 # Ensure our test decoder won't interfere with subsequent tests.
2225 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002226 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002229 data = "1234567890"
2230 tests = ("utf-16",
2231 "utf-16-le",
2232 "utf-16-be",
2233 "utf-32",
2234 "utf-32-le",
2235 "utf-32-be")
2236 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237 buf = self.BytesIO()
2238 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002239 # Check if the BOM is written only once (see issue1753).
2240 f.write(data)
2241 f.write(data)
2242 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002243 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002244 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002245 self.assertEqual(f.read(), data * 2)
2246 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002247
Benjamin Petersona1b49012009-03-31 23:11:32 +00002248 def test_unreadable(self):
2249 class UnReadable(self.BytesIO):
2250 def readable(self):
2251 return False
2252 txt = self.TextIOWrapper(UnReadable())
2253 self.assertRaises(IOError, txt.read)
2254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 def test_read_one_by_one(self):
2256 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002257 reads = ""
2258 while True:
2259 c = txt.read(1)
2260 if not c:
2261 break
2262 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002263 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002264
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002265 def test_readlines(self):
2266 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2267 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2268 txt.seek(0)
2269 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2270 txt.seek(0)
2271 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2272
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002273 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002274 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002275 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002276 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002277 reads = ""
2278 while True:
2279 c = txt.read(128)
2280 if not c:
2281 break
2282 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002283 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002284
2285 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287
2288 # read one char at a time
2289 reads = ""
2290 while True:
2291 c = txt.read(1)
2292 if not c:
2293 break
2294 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002295 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002296
2297 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002298 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002299 txt._CHUNK_SIZE = 4
2300
2301 reads = ""
2302 while True:
2303 c = txt.read(4)
2304 if not c:
2305 break
2306 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002307 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002308
2309 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002310 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002311 txt._CHUNK_SIZE = 4
2312
2313 reads = txt.read(4)
2314 reads += txt.read(4)
2315 reads += txt.readline()
2316 reads += txt.readline()
2317 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002318 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002319
2320 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002321 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002322 txt._CHUNK_SIZE = 4
2323
2324 reads = txt.read(4)
2325 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002326 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002327
2328 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002329 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002330 txt._CHUNK_SIZE = 4
2331
2332 reads = txt.read(4)
2333 pos = txt.tell()
2334 txt.seek(0)
2335 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002336 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002337
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002338 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002339 buffer = self.BytesIO(self.testdata)
2340 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002341
2342 self.assertEqual(buffer.seekable(), txt.seekable())
2343
Antoine Pitroue4501852009-05-14 18:55:55 +00002344 def test_append_bom(self):
2345 # The BOM is not written again when appending to a non-empty file
2346 filename = support.TESTFN
2347 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2348 with self.open(filename, 'w', encoding=charset) as f:
2349 f.write('aaa')
2350 pos = f.tell()
2351 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002352 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002353
2354 with self.open(filename, 'a', encoding=charset) as f:
2355 f.write('xxx')
2356 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002357 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002358
2359 def test_seek_bom(self):
2360 # Same test, but when seeking manually
2361 filename = support.TESTFN
2362 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2363 with self.open(filename, 'w', encoding=charset) as f:
2364 f.write('aaa')
2365 pos = f.tell()
2366 with self.open(filename, 'r+', encoding=charset) as f:
2367 f.seek(pos)
2368 f.write('zzz')
2369 f.seek(0)
2370 f.write('bbb')
2371 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002372 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002373
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002374 def test_errors_property(self):
2375 with self.open(support.TESTFN, "w") as f:
2376 self.assertEqual(f.errors, "strict")
2377 with self.open(support.TESTFN, "w", errors="replace") as f:
2378 self.assertEqual(f.errors, "replace")
2379
Brett Cannon31f59292011-02-21 19:29:56 +00002380 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002381 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002382 def test_threads_write(self):
2383 # Issue6750: concurrent writes could duplicate data
2384 event = threading.Event()
2385 with self.open(support.TESTFN, "w", buffering=1) as f:
2386 def run(n):
2387 text = "Thread%03d\n" % n
2388 event.wait()
2389 f.write(text)
2390 threads = [threading.Thread(target=lambda n=x: run(n))
2391 for x in range(20)]
2392 for t in threads:
2393 t.start()
2394 time.sleep(0.02)
2395 event.set()
2396 for t in threads:
2397 t.join()
2398 with self.open(support.TESTFN) as f:
2399 content = f.read()
2400 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002401 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002402
Antoine Pitrou6be88762010-05-03 16:48:20 +00002403 def test_flush_error_on_close(self):
2404 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2405 def bad_flush():
2406 raise IOError()
2407 txt.flush = bad_flush
2408 self.assertRaises(IOError, txt.close) # exception not swallowed
2409
2410 def test_multi_close(self):
2411 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2412 txt.close()
2413 txt.close()
2414 txt.close()
2415 self.assertRaises(ValueError, txt.flush)
2416
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002417 def test_unseekable(self):
2418 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2419 self.assertRaises(self.UnsupportedOperation, txt.tell)
2420 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2421
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002422 def test_readonly_attributes(self):
2423 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2424 buf = self.BytesIO(self.testdata)
2425 with self.assertRaises(AttributeError):
2426 txt.buffer = buf
2427
Antoine Pitroue96ec682011-07-23 21:46:35 +02002428 def test_rawio(self):
2429 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2430 # that subprocess.Popen() can have the required unbuffered
2431 # semantics with universal_newlines=True.
2432 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2433 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2434 # Reads
2435 self.assertEqual(txt.read(4), 'abcd')
2436 self.assertEqual(txt.readline(), 'efghi\n')
2437 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2438
2439 def test_rawio_write_through(self):
2440 # Issue #12591: with write_through=True, writes don't need a flush
2441 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2442 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2443 write_through=True)
2444 txt.write('1')
2445 txt.write('23\n4')
2446 txt.write('5')
2447 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2448
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002449class CTextIOWrapperTest(TextIOWrapperTest):
2450
2451 def test_initialization(self):
2452 r = self.BytesIO(b"\xc3\xa9\n\n")
2453 b = self.BufferedReader(r, 1000)
2454 t = self.TextIOWrapper(b)
2455 self.assertRaises(TypeError, t.__init__, b, newline=42)
2456 self.assertRaises(ValueError, t.read)
2457 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2458 self.assertRaises(ValueError, t.read)
2459
2460 def test_garbage_collection(self):
2461 # C TextIOWrapper objects are collected, and collecting them flushes
2462 # all data to disk.
2463 # The Python version has __del__, so it ends in gc.garbage instead.
2464 rawio = io.FileIO(support.TESTFN, "wb")
2465 b = self.BufferedWriter(rawio)
2466 t = self.TextIOWrapper(b, encoding="ascii")
2467 t.write("456def")
2468 t.x = t
2469 wr = weakref.ref(t)
2470 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002471 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002472 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002473 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 self.assertEqual(f.read(), b"456def")
2475
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002476 def test_rwpair_cleared_before_textio(self):
2477 # Issue 13070: TextIOWrapper's finalization would crash when called
2478 # after the reference to the underlying BufferedRWPair's writer got
2479 # cleared by the GC.
2480 for i in range(1000):
2481 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2482 t1 = self.TextIOWrapper(b1, encoding="ascii")
2483 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2484 t2 = self.TextIOWrapper(b2, encoding="ascii")
2485 # circular references
2486 t1.buddy = t2
2487 t2.buddy = t1
2488 support.gc_collect()
2489
2490
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002491class PyTextIOWrapperTest(TextIOWrapperTest):
2492 pass
2493
2494
2495class IncrementalNewlineDecoderTest(unittest.TestCase):
2496
2497 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002498 # UTF-8 specific tests for a newline decoder
2499 def _check_decode(b, s, **kwargs):
2500 # We exercise getstate() / setstate() as well as decode()
2501 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002503 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002505
Antoine Pitrou180a3362008-12-14 16:36:46 +00002506 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002507
Antoine Pitrou180a3362008-12-14 16:36:46 +00002508 _check_decode(b'\xe8', "")
2509 _check_decode(b'\xa2', "")
2510 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002511
Antoine Pitrou180a3362008-12-14 16:36:46 +00002512 _check_decode(b'\xe8', "")
2513 _check_decode(b'\xa2', "")
2514 _check_decode(b'\x88', "\u8888")
2515
2516 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002517 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2518
Antoine Pitrou180a3362008-12-14 16:36:46 +00002519 decoder.reset()
2520 _check_decode(b'\n', "\n")
2521 _check_decode(b'\r', "")
2522 _check_decode(b'', "\n", final=True)
2523 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002524
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525 _check_decode(b'\r', "")
2526 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002527
Antoine Pitrou180a3362008-12-14 16:36:46 +00002528 _check_decode(b'\r\r\n', "\n\n")
2529 _check_decode(b'\r', "")
2530 _check_decode(b'\r', "\n")
2531 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002532
Antoine Pitrou180a3362008-12-14 16:36:46 +00002533 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2534 _check_decode(b'\xe8\xa2\x88', "\u8888")
2535 _check_decode(b'\n', "\n")
2536 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2537 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002539 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002540 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002541 if encoding is not None:
2542 encoder = codecs.getincrementalencoder(encoding)()
2543 def _decode_bytewise(s):
2544 # Decode one byte at a time
2545 for b in encoder.encode(s):
2546 result.append(decoder.decode(bytes([b])))
2547 else:
2548 encoder = None
2549 def _decode_bytewise(s):
2550 # Decode one char at a time
2551 for c in s:
2552 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002554 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002555 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002556 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002558 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002559 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002560 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002561 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002562 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002563 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002564 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 input = "abc"
2566 if encoder is not None:
2567 encoder.reset()
2568 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(decoder.decode(input), "abc")
2570 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002571
2572 def test_newline_decoder(self):
2573 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574 # None meaning the IncrementalNewlineDecoder takes unicode input
2575 # rather than bytes input
2576 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002577 'utf-16', 'utf-16-le', 'utf-16-be',
2578 'utf-32', 'utf-32-le', 'utf-32-be',
2579 )
2580 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002581 decoder = enc and codecs.getincrementaldecoder(enc)()
2582 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2583 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002584 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002585 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2586 self.check_newline_decoding_utf8(decoder)
2587
Antoine Pitrou66913e22009-03-06 23:40:56 +00002588 def test_newline_bytes(self):
2589 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2590 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002591 self.assertEqual(dec.newlines, None)
2592 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2593 self.assertEqual(dec.newlines, None)
2594 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2595 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002596 dec = self.IncrementalNewlineDecoder(None, translate=False)
2597 _check(dec)
2598 dec = self.IncrementalNewlineDecoder(None, translate=True)
2599 _check(dec)
2600
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2602 pass
2603
2604class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2605 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002606
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002607
Guido van Rossum01a27522007-03-07 01:00:12 +00002608# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002609
Guido van Rossum5abbf752007-08-27 17:39:33 +00002610class MiscIOTest(unittest.TestCase):
2611
Barry Warsaw40e82462008-11-20 20:14:50 +00002612 def tearDown(self):
2613 support.unlink(support.TESTFN)
2614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002615 def test___all__(self):
2616 for name in self.io.__all__:
2617 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002618 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002619 if name == "open":
2620 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002621 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002622 self.assertTrue(issubclass(obj, Exception), name)
2623 elif not name.startswith("SEEK_"):
2624 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002625
Barry Warsaw40e82462008-11-20 20:14:50 +00002626 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002627 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002628 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002629 f.close()
2630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002631 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002632 self.assertEqual(f.name, support.TESTFN)
2633 self.assertEqual(f.buffer.name, support.TESTFN)
2634 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2635 self.assertEqual(f.mode, "U")
2636 self.assertEqual(f.buffer.mode, "rb")
2637 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002638 f.close()
2639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002641 self.assertEqual(f.mode, "w+")
2642 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2643 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002644
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002646 self.assertEqual(g.mode, "wb")
2647 self.assertEqual(g.raw.mode, "wb")
2648 self.assertEqual(g.name, f.fileno())
2649 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002650 f.close()
2651 g.close()
2652
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002653 def test_io_after_close(self):
2654 for kwargs in [
2655 {"mode": "w"},
2656 {"mode": "wb"},
2657 {"mode": "w", "buffering": 1},
2658 {"mode": "w", "buffering": 2},
2659 {"mode": "wb", "buffering": 0},
2660 {"mode": "r"},
2661 {"mode": "rb"},
2662 {"mode": "r", "buffering": 1},
2663 {"mode": "r", "buffering": 2},
2664 {"mode": "rb", "buffering": 0},
2665 {"mode": "w+"},
2666 {"mode": "w+b"},
2667 {"mode": "w+", "buffering": 1},
2668 {"mode": "w+", "buffering": 2},
2669 {"mode": "w+b", "buffering": 0},
2670 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002671 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002672 f.close()
2673 self.assertRaises(ValueError, f.flush)
2674 self.assertRaises(ValueError, f.fileno)
2675 self.assertRaises(ValueError, f.isatty)
2676 self.assertRaises(ValueError, f.__iter__)
2677 if hasattr(f, "peek"):
2678 self.assertRaises(ValueError, f.peek, 1)
2679 self.assertRaises(ValueError, f.read)
2680 if hasattr(f, "read1"):
2681 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002682 if hasattr(f, "readall"):
2683 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002684 if hasattr(f, "readinto"):
2685 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2686 self.assertRaises(ValueError, f.readline)
2687 self.assertRaises(ValueError, f.readlines)
2688 self.assertRaises(ValueError, f.seek, 0)
2689 self.assertRaises(ValueError, f.tell)
2690 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002691 self.assertRaises(ValueError, f.write,
2692 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002693 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002694 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002696 def test_blockingioerror(self):
2697 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002698 class C(str):
2699 pass
2700 c = C("")
2701 b = self.BlockingIOError(1, c)
2702 c.b = b
2703 b.c = c
2704 wr = weakref.ref(c)
2705 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002706 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002707 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002708
2709 def test_abcs(self):
2710 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002711 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2712 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2713 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2714 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715
2716 def _check_abc_inheritance(self, abcmodule):
2717 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002718 self.assertIsInstance(f, abcmodule.IOBase)
2719 self.assertIsInstance(f, abcmodule.RawIOBase)
2720 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2721 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002723 self.assertIsInstance(f, abcmodule.IOBase)
2724 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2725 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2726 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002728 self.assertIsInstance(f, abcmodule.IOBase)
2729 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2730 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2731 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732
2733 def test_abc_inheritance(self):
2734 # Test implementations inherit from their respective ABCs
2735 self._check_abc_inheritance(self)
2736
2737 def test_abc_inheritance_official(self):
2738 # Test implementations inherit from the official ABCs of the
2739 # baseline "io" module.
2740 self._check_abc_inheritance(io)
2741
Antoine Pitroue033e062010-10-29 10:38:18 +00002742 def _check_warn_on_dealloc(self, *args, **kwargs):
2743 f = open(*args, **kwargs)
2744 r = repr(f)
2745 with self.assertWarns(ResourceWarning) as cm:
2746 f = None
2747 support.gc_collect()
2748 self.assertIn(r, str(cm.warning.args[0]))
2749
2750 def test_warn_on_dealloc(self):
2751 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2752 self._check_warn_on_dealloc(support.TESTFN, "wb")
2753 self._check_warn_on_dealloc(support.TESTFN, "w")
2754
2755 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2756 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002757 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002758 for fd in fds:
2759 try:
2760 os.close(fd)
2761 except EnvironmentError as e:
2762 if e.errno != errno.EBADF:
2763 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002764 self.addCleanup(cleanup_fds)
2765 r, w = os.pipe()
2766 fds += r, w
2767 self._check_warn_on_dealloc(r, *args, **kwargs)
2768 # When using closefd=False, there's no warning
2769 r, w = os.pipe()
2770 fds += r, w
2771 with warnings.catch_warnings(record=True) as recorded:
2772 open(r, *args, closefd=False, **kwargs)
2773 support.gc_collect()
2774 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002775
2776 def test_warn_on_dealloc_fd(self):
2777 self._check_warn_on_dealloc_fd("rb", buffering=0)
2778 self._check_warn_on_dealloc_fd("rb")
2779 self._check_warn_on_dealloc_fd("r")
2780
2781
Antoine Pitrou243757e2010-11-05 21:15:39 +00002782 def test_pickling(self):
2783 # Pickling file objects is forbidden
2784 for kwargs in [
2785 {"mode": "w"},
2786 {"mode": "wb"},
2787 {"mode": "wb", "buffering": 0},
2788 {"mode": "r"},
2789 {"mode": "rb"},
2790 {"mode": "rb", "buffering": 0},
2791 {"mode": "w+"},
2792 {"mode": "w+b"},
2793 {"mode": "w+b", "buffering": 0},
2794 ]:
2795 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2796 with self.open(support.TESTFN, **kwargs) as f:
2797 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2798
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002799 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2800 def test_nonblock_pipe_write_bigbuf(self):
2801 self._test_nonblock_pipe_write(16*1024)
2802
2803 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2804 def test_nonblock_pipe_write_smallbuf(self):
2805 self._test_nonblock_pipe_write(1024)
2806
2807 def _set_non_blocking(self, fd):
2808 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2809 self.assertNotEqual(flags, -1)
2810 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2811 self.assertEqual(res, 0)
2812
2813 def _test_nonblock_pipe_write(self, bufsize):
2814 sent = []
2815 received = []
2816 r, w = os.pipe()
2817 self._set_non_blocking(r)
2818 self._set_non_blocking(w)
2819
2820 # To exercise all code paths in the C implementation we need
2821 # to play with buffer sizes. For instance, if we choose a
2822 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2823 # then we will never get a partial write of the buffer.
2824 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2825 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2826
2827 with rf, wf:
2828 for N in 9999, 73, 7574:
2829 try:
2830 i = 0
2831 while True:
2832 msg = bytes([i % 26 + 97]) * N
2833 sent.append(msg)
2834 wf.write(msg)
2835 i += 1
2836
2837 except self.BlockingIOError as e:
2838 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002839 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002840 sent[-1] = sent[-1][:e.characters_written]
2841 received.append(rf.read())
2842 msg = b'BLOCKED'
2843 wf.write(msg)
2844 sent.append(msg)
2845
2846 while True:
2847 try:
2848 wf.flush()
2849 break
2850 except self.BlockingIOError as e:
2851 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002852 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002853 self.assertEqual(e.characters_written, 0)
2854 received.append(rf.read())
2855
2856 received += iter(rf.read, None)
2857
2858 sent, received = b''.join(sent), b''.join(received)
2859 self.assertTrue(sent == received)
2860 self.assertTrue(wf.closed)
2861 self.assertTrue(rf.closed)
2862
Charles-François Natalidc3044c2012-01-09 22:40:02 +01002863 def test_create_fail(self):
2864 # 'x' mode fails if file is existing
2865 with self.open(support.TESTFN, 'w'):
2866 pass
2867 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
2868
2869 def test_create_writes(self):
2870 # 'x' mode opens for writing
2871 with self.open(support.TESTFN, 'xb') as f:
2872 f.write(b"spam")
2873 with self.open(support.TESTFN, 'rb') as f:
2874 self.assertEqual(b"spam", f.read())
2875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002876class CMiscIOTest(MiscIOTest):
2877 io = io
2878
2879class PyMiscIOTest(MiscIOTest):
2880 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002881
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002882
2883@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2884class SignalsTest(unittest.TestCase):
2885
2886 def setUp(self):
2887 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2888
2889 def tearDown(self):
2890 signal.signal(signal.SIGALRM, self.oldalrm)
2891
2892 def alarm_interrupt(self, sig, frame):
2893 1/0
2894
2895 @unittest.skipUnless(threading, 'Threading required for this test.')
2896 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2897 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002898 invokes the signal handler, and bubbles up the exception raised
2899 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002900 read_results = []
2901 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002902 if hasattr(signal, 'pthread_sigmask'):
2903 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002904 s = os.read(r, 1)
2905 read_results.append(s)
2906 t = threading.Thread(target=_read)
2907 t.daemon = True
2908 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002909 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002910 try:
2911 wio = self.io.open(w, **fdopen_kwargs)
2912 t.start()
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002913 signal.alarm(1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002914 # Fill the pipe enough that the write will be blocking.
2915 # It will be interrupted by the timer armed above. Since the
2916 # other thread has read one byte, the low-level write will
2917 # return with a successful (partial) result rather than an EINTR.
2918 # The buffered IO layer must check for pending signal
2919 # handlers, which in this case will invoke alarm_interrupt().
2920 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002921 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002922 t.join()
2923 # We got one byte, get another one and check that it isn't a
2924 # repeat of the first one.
2925 read_results.append(os.read(r, 1))
2926 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2927 finally:
2928 os.close(w)
2929 os.close(r)
2930 # This is deliberate. If we didn't close the file descriptor
2931 # before closing wio, wio would try to flush its internal
2932 # buffer, and block again.
2933 try:
2934 wio.close()
2935 except IOError as e:
2936 if e.errno != errno.EBADF:
2937 raise
2938
2939 def test_interrupted_write_unbuffered(self):
2940 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2941
2942 def test_interrupted_write_buffered(self):
2943 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2944
2945 def test_interrupted_write_text(self):
2946 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2947
Brett Cannon31f59292011-02-21 19:29:56 +00002948 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002949 def check_reentrant_write(self, data, **fdopen_kwargs):
2950 def on_alarm(*args):
2951 # Will be called reentrantly from the same thread
2952 wio.write(data)
2953 1/0
2954 signal.signal(signal.SIGALRM, on_alarm)
2955 r, w = os.pipe()
2956 wio = self.io.open(w, **fdopen_kwargs)
2957 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002958 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002959 # Either the reentrant call to wio.write() fails with RuntimeError,
2960 # or the signal handler raises ZeroDivisionError.
2961 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2962 while 1:
2963 for i in range(100):
2964 wio.write(data)
2965 wio.flush()
2966 # Make sure the buffer doesn't fill up and block further writes
2967 os.read(r, len(data) * 100)
2968 exc = cm.exception
2969 if isinstance(exc, RuntimeError):
2970 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2971 finally:
2972 wio.close()
2973 os.close(r)
2974
2975 def test_reentrant_write_buffered(self):
2976 self.check_reentrant_write(b"xy", mode="wb")
2977
2978 def test_reentrant_write_text(self):
2979 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2980
Antoine Pitrou707ce822011-02-25 21:24:11 +00002981 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2982 """Check that a buffered read, when it gets interrupted (either
2983 returning a partial result or EINTR), properly invokes the signal
2984 handler and retries if the latter returned successfully."""
2985 r, w = os.pipe()
2986 fdopen_kwargs["closefd"] = False
2987 def alarm_handler(sig, frame):
2988 os.write(w, b"bar")
2989 signal.signal(signal.SIGALRM, alarm_handler)
2990 try:
2991 rio = self.io.open(r, **fdopen_kwargs)
2992 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07002993 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00002994 # Expected behaviour:
2995 # - first raw read() returns partial b"foo"
2996 # - second raw read() returns EINTR
2997 # - third raw read() returns b"bar"
2998 self.assertEqual(decode(rio.read(6)), "foobar")
2999 finally:
3000 rio.close()
3001 os.close(w)
3002 os.close(r)
3003
Antoine Pitrou20db5112011-08-19 20:32:34 +02003004 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003005 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3006 mode="rb")
3007
Antoine Pitrou20db5112011-08-19 20:32:34 +02003008 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003009 self.check_interrupted_read_retry(lambda x: x,
3010 mode="r")
3011
3012 @unittest.skipUnless(threading, 'Threading required for this test.')
3013 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3014 """Check that a buffered write, when it gets interrupted (either
3015 returning a partial result or EINTR), properly invokes the signal
3016 handler and retries if the latter returned successfully."""
3017 select = support.import_module("select")
3018 # A quantity that exceeds the buffer size of an anonymous pipe's
3019 # write end.
3020 N = 1024 * 1024
3021 r, w = os.pipe()
3022 fdopen_kwargs["closefd"] = False
3023 # We need a separate thread to read from the pipe and allow the
3024 # write() to finish. This thread is started after the SIGALRM is
3025 # received (forcing a first EINTR in write()).
3026 read_results = []
3027 write_finished = False
3028 def _read():
3029 while not write_finished:
3030 while r in select.select([r], [], [], 1.0)[0]:
3031 s = os.read(r, 1024)
3032 read_results.append(s)
3033 t = threading.Thread(target=_read)
3034 t.daemon = True
3035 def alarm1(sig, frame):
3036 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003037 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003038 def alarm2(sig, frame):
3039 t.start()
3040 signal.signal(signal.SIGALRM, alarm1)
3041 try:
3042 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07003043 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00003044 # Expected behaviour:
3045 # - first raw write() is partial (because of the limited pipe buffer
3046 # and the first alarm)
3047 # - second raw write() returns EINTR (because of the second alarm)
3048 # - subsequent write()s are successful (either partial or complete)
3049 self.assertEqual(N, wio.write(item * N))
3050 wio.flush()
3051 write_finished = True
3052 t.join()
3053 self.assertEqual(N, sum(len(x) for x in read_results))
3054 finally:
3055 write_finished = True
3056 os.close(w)
3057 os.close(r)
3058 # This is deliberate. If we didn't close the file descriptor
3059 # before closing wio, wio would try to flush its internal
3060 # buffer, and could block (in case of failure).
3061 try:
3062 wio.close()
3063 except IOError as e:
3064 if e.errno != errno.EBADF:
3065 raise
3066
Antoine Pitrou20db5112011-08-19 20:32:34 +02003067 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003068 self.check_interrupted_write_retry(b"x", mode="wb")
3069
Antoine Pitrou20db5112011-08-19 20:32:34 +02003070 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003071 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3072
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003073
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003074class CSignalsTest(SignalsTest):
3075 io = io
3076
3077class PySignalsTest(SignalsTest):
3078 io = pyio
3079
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003080 # Handling reentrancy issues would slow down _pyio even more, so the
3081 # tests are disabled.
3082 test_reentrant_write_buffered = None
3083 test_reentrant_write_text = None
3084
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003085
Guido van Rossum28524c72007-02-27 05:47:44 +00003086def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003087 tests = (CIOTest, PyIOTest,
3088 CBufferedReaderTest, PyBufferedReaderTest,
3089 CBufferedWriterTest, PyBufferedWriterTest,
3090 CBufferedRWPairTest, PyBufferedRWPairTest,
3091 CBufferedRandomTest, PyBufferedRandomTest,
3092 StatefulIncrementalDecoderTest,
3093 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3094 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003095 CMiscIOTest, PyMiscIOTest,
3096 CSignalsTest, PySignalsTest,
3097 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003098
3099 # Put the namespaces of the IO module we are testing and some useful mock
3100 # classes in the __dict__ of each test.
3101 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003102 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003103 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3104 c_io_ns = {name : getattr(io, name) for name in all_members}
3105 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3106 globs = globals()
3107 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3108 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3109 # Avoid turning open into a bound method.
3110 py_io_ns["open"] = pyio.OpenWrapper
3111 for test in tests:
3112 if test.__name__.startswith("C"):
3113 for name, obj in c_io_ns.items():
3114 setattr(test, name, obj)
3115 elif test.__name__.startswith("Py"):
3116 for name, obj in py_io_ns.items():
3117 setattr(test, name, obj)
3118
3119 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003120
3121if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003122 test_main()