blob: 5954999e6e9418f04a9d607cf0a77c5f995a1296 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +010045try:
46 import fcntl
47except ImportError:
48 fcntl = None
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050def _default_chunk_size():
51 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000052 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 return f._CHUNK_SIZE
54
55
Antoine Pitrou328ec742010-09-14 18:37:24 +000056class MockRawIOWithoutRead:
57 """A RawIO implementation without read(), so as to exercise the default
58 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 def __init__(self, read_stack=()):
61 self._read_stack = list(read_stack)
62 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000064 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000067 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000068 return len(b)
69
70 def writable(self):
71 return True
72
Guido van Rossum68bbcd22007-02-27 17:19:33 +000073 def fileno(self):
74 return 42
75
76 def readable(self):
77 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080 return True
81
Guido van Rossum01a27522007-03-07 01:00:12 +000082 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000084
85 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000086 return 0 # same comment as above
87
88 def readinto(self, buf):
89 self._reads += 1
90 max_len = len(buf)
91 try:
92 data = self._read_stack[0]
93 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000094 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 return 0
96 if data is None:
97 del self._read_stack[0]
98 return None
99 n = len(data)
100 if len(data) <= max_len:
101 del self._read_stack[0]
102 buf[:n] = data
103 return n
104 else:
105 buf[:] = data[:max_len]
106 self._read_stack[0] = data[max_len:]
107 return max_len
108
109 def truncate(self, pos=None):
110 return pos
111
Antoine Pitrou328ec742010-09-14 18:37:24 +0000112class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
113 pass
114
115class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
116 pass
117
118
119class MockRawIO(MockRawIOWithoutRead):
120
121 def read(self, n=None):
122 self._reads += 1
123 try:
124 return self._read_stack.pop(0)
125 except:
126 self._extraneous_reads += 1
127 return b""
128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000129class CMockRawIO(MockRawIO, io.RawIOBase):
130 pass
131
132class PyMockRawIO(MockRawIO, pyio.RawIOBase):
133 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000134
Guido van Rossuma9e20242007-03-08 00:43:48 +0000135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000136class MisbehavedRawIO(MockRawIO):
137 def write(self, b):
138 return super().write(b) * 2
139
140 def read(self, n=None):
141 return super().read(n) * 2
142
143 def seek(self, pos, whence):
144 return -123
145
146 def tell(self):
147 return -456
148
149 def readinto(self, buf):
150 super().readinto(buf)
151 return len(buf) * 5
152
153class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
154 pass
155
156class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
157 pass
158
159
160class CloseFailureIO(MockRawIO):
161 closed = 0
162
163 def close(self):
164 if not self.closed:
165 self.closed = 1
166 raise IOError
167
168class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
169 pass
170
171class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
172 pass
173
174
175class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000176
177 def __init__(self, data):
178 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180
181 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000183 self.read_history.append(None if res is None else len(res))
184 return res
185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def readinto(self, b):
187 res = super().readinto(b)
188 self.read_history.append(res)
189 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class CMockFileIO(MockFileIO, io.BytesIO):
192 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000194class PyMockFileIO(MockFileIO, pyio.BytesIO):
195 pass
196
197
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000198class MockUnseekableIO:
199 def seekable(self):
200 return False
201
202 def seek(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205 def tell(self, *args):
206 raise self.UnsupportedOperation("not seekable")
207
208class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
209 UnsupportedOperation = io.UnsupportedOperation
210
211class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
212 UnsupportedOperation = pyio.UnsupportedOperation
213
214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000215class MockNonBlockWriterIO:
216
217 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000218 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000219 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000221 def pop_written(self):
222 s = b"".join(self._write_stack)
223 self._write_stack[:] = []
224 return s
225
226 def block_on(self, char):
227 """Block when a given char is encountered."""
228 self._blocker_char = char
229
230 def readable(self):
231 return True
232
233 def seekable(self):
234 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000235
Guido van Rossum01a27522007-03-07 01:00:12 +0000236 def writable(self):
237 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000239 def write(self, b):
240 b = bytes(b)
241 n = -1
242 if self._blocker_char:
243 try:
244 n = b.index(self._blocker_char)
245 except ValueError:
246 pass
247 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100248 if n > 0:
249 # write data up to the first blocker
250 self._write_stack.append(b[:n])
251 return n
252 else:
253 # cancel blocker and indicate would block
254 self._blocker_char = None
255 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000256 self._write_stack.append(b)
257 return len(b)
258
259class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
260 BlockingIOError = io.BlockingIOError
261
262class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
263 BlockingIOError = pyio.BlockingIOError
264
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266class IOTest(unittest.TestCase):
267
Neal Norwitze7789b12008-03-24 06:18:09 +0000268 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000270
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000271 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000273
Guido van Rossum28524c72007-02-27 05:47:44 +0000274 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000275 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000276 f.truncate(0)
277 self.assertEqual(f.tell(), 5)
278 f.seek(0)
279
280 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(0), 0)
282 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000283 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000285 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000286 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000288 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000289 self.assertEqual(f.seek(-1, 2), 13)
290 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000291
Guido van Rossum87429772007-04-10 21:06:59 +0000292 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000293 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000294 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295
Guido van Rossum9b76da62007-04-11 01:09:03 +0000296 def read_ops(self, f, buffered=False):
297 data = f.read(5)
298 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000299 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000300 self.assertEqual(f.readinto(data), 5)
301 self.assertEqual(data, b" worl")
302 self.assertEqual(f.readinto(data), 2)
303 self.assertEqual(len(data), 5)
304 self.assertEqual(data[:2], b"d\n")
305 self.assertEqual(f.seek(0), 0)
306 self.assertEqual(f.read(20), b"hello world\n")
307 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000308 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 self.assertEqual(f.seek(-6, 2), 6)
310 self.assertEqual(f.read(5), b"world")
311 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000312 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000313 self.assertEqual(f.seek(-6, 1), 5)
314 self.assertEqual(f.read(5), b" worl")
315 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000316 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000317 if buffered:
318 f.seek(0)
319 self.assertEqual(f.read(), b"hello world\n")
320 f.seek(6)
321 self.assertEqual(f.read(), b"world\n")
322 self.assertEqual(f.read(), b"")
323
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 LARGE = 2**31
325
Guido van Rossum53807da2007-04-10 19:01:47 +0000326 def large_file_ops(self, f):
327 assert f.readable()
328 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000329 self.assertEqual(f.seek(self.LARGE), self.LARGE)
330 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000332 self.assertEqual(f.tell(), self.LARGE + 3)
333 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000335 self.assertEqual(f.tell(), self.LARGE + 2)
336 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000338 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000339 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
340 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000341 self.assertEqual(f.read(2), b"x")
342
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000343 def test_invalid_operations(self):
344 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000345 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000346 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000347 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000348 self.assertRaises(exc, fp.read)
349 self.assertRaises(exc, fp.readline)
350 with self.open(support.TESTFN, "wb", buffering=0) as fp:
351 self.assertRaises(exc, fp.read)
352 self.assertRaises(exc, fp.readline)
353 with self.open(support.TESTFN, "rb", buffering=0) as fp:
354 self.assertRaises(exc, fp.write, b"blah")
355 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000356 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000357 self.assertRaises(exc, fp.write, b"blah")
358 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000359 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000360 self.assertRaises(exc, fp.write, "blah")
361 self.assertRaises(exc, fp.writelines, ["blah\n"])
362 # Non-zero seeking from current or end pos
363 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
364 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000365
Guido van Rossum28524c72007-02-27 05:47:44 +0000366 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000367 with self.open(support.TESTFN, "wb", buffering=0) as f:
368 self.assertEqual(f.readable(), False)
369 self.assertEqual(f.writable(), True)
370 self.assertEqual(f.seekable(), True)
371 self.write_ops(f)
372 with self.open(support.TESTFN, "rb", buffering=0) as f:
373 self.assertEqual(f.readable(), True)
374 self.assertEqual(f.writable(), False)
375 self.assertEqual(f.seekable(), True)
376 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000377
Guido van Rossum87429772007-04-10 21:06:59 +0000378 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000379 with self.open(support.TESTFN, "wb") as f:
380 self.assertEqual(f.readable(), False)
381 self.assertEqual(f.writable(), True)
382 self.assertEqual(f.seekable(), True)
383 self.write_ops(f)
384 with self.open(support.TESTFN, "rb") as f:
385 self.assertEqual(f.readable(), True)
386 self.assertEqual(f.writable(), False)
387 self.assertEqual(f.seekable(), True)
388 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000389
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000390 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000391 with self.open(support.TESTFN, "wb") as f:
392 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
393 with self.open(support.TESTFN, "rb") as f:
394 self.assertEqual(f.readline(), b"abc\n")
395 self.assertEqual(f.readline(10), b"def\n")
396 self.assertEqual(f.readline(2), b"xy")
397 self.assertEqual(f.readline(4), b"zzy\n")
398 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000399 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000400 self.assertRaises(TypeError, f.readline, 5.3)
401 with self.open(support.TESTFN, "r") as f:
402 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000403
Guido van Rossum28524c72007-02-27 05:47:44 +0000404 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000406 self.write_ops(f)
407 data = f.getvalue()
408 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000410 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000411
Guido van Rossum53807da2007-04-10 19:01:47 +0000412 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000413 # On Windows and Mac OSX this test comsumes large resources; It takes
414 # a long time to build the >2GB file and takes >2GB of disk space
415 # therefore the resource must be enabled to run this test.
416 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000417 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000418 print("\nTesting large file ops skipped on %s." % sys.platform,
419 file=sys.stderr)
420 print("It requires %d bytes and a long time." % self.LARGE,
421 file=sys.stderr)
422 print("Use 'regrtest.py -u largefile test_io' to run it.",
423 file=sys.stderr)
424 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000425 with self.open(support.TESTFN, "w+b", 0) as f:
426 self.large_file_ops(f)
427 with self.open(support.TESTFN, "w+b") as f:
428 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000429
430 def test_with_open(self):
431 for bufsize in (0, 1, 100):
432 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000433 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000434 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000435 self.assertEqual(f.closed, True)
436 f = None
437 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000438 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000439 1/0
440 except ZeroDivisionError:
441 self.assertEqual(f.closed, True)
442 else:
443 self.fail("1/0 didn't raise an exception")
444
Antoine Pitrou08838b62009-01-21 00:55:13 +0000445 # issue 5008
446 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000447 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000448 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000449 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000450 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000452 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000454 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000455
Guido van Rossum87429772007-04-10 21:06:59 +0000456 def test_destructor(self):
457 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000458 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def __del__(self):
460 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 try:
462 f = super().__del__
463 except AttributeError:
464 pass
465 else:
466 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000467 def close(self):
468 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000469 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000470 def flush(self):
471 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000472 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000473 with support.check_warnings(('', ResourceWarning)):
474 f = MyFileIO(support.TESTFN, "wb")
475 f.write(b"xxx")
476 del f
477 support.gc_collect()
478 self.assertEqual(record, [1, 2, 3])
479 with self.open(support.TESTFN, "rb") as f:
480 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481
482 def _check_base_destructor(self, base):
483 record = []
484 class MyIO(base):
485 def __init__(self):
486 # This exercises the availability of attributes on object
487 # destruction.
488 # (in the C version, close() is called by the tp_dealloc
489 # function, not by __del__)
490 self.on_del = 1
491 self.on_close = 2
492 self.on_flush = 3
493 def __del__(self):
494 record.append(self.on_del)
495 try:
496 f = super().__del__
497 except AttributeError:
498 pass
499 else:
500 f()
501 def close(self):
502 record.append(self.on_close)
503 super().close()
504 def flush(self):
505 record.append(self.on_flush)
506 super().flush()
507 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000508 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000509 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000510 self.assertEqual(record, [1, 2, 3])
511
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000512 def test_IOBase_destructor(self):
513 self._check_base_destructor(self.IOBase)
514
515 def test_RawIOBase_destructor(self):
516 self._check_base_destructor(self.RawIOBase)
517
518 def test_BufferedIOBase_destructor(self):
519 self._check_base_destructor(self.BufferedIOBase)
520
521 def test_TextIOBase_destructor(self):
522 self._check_base_destructor(self.TextIOBase)
523
Guido van Rossum87429772007-04-10 21:06:59 +0000524 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb") as f:
526 f.write(b"xxx")
527 with self.open(support.TESTFN, "rb") as f:
528 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000529
Guido van Rossumd4103952007-04-12 05:44:49 +0000530 def test_array_writes(self):
531 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000532 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000533 with self.open(support.TESTFN, "wb", 0) as f:
534 self.assertEqual(f.write(a), n)
535 with self.open(support.TESTFN, "wb") as f:
536 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000537
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000538 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000540 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000542 def test_read_closed(self):
543 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000544 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545 with self.open(support.TESTFN, "r") as f:
546 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547 self.assertEqual(file.read(), "egg\n")
548 file.seek(0)
549 file.close()
550 self.assertRaises(ValueError, file.read)
551
552 def test_no_closefd_with_filename(self):
553 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000554 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000555
556 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000557 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000558 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000560 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000561 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000562 self.assertEqual(file.buffer.raw.closefd, False)
563
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000564 def test_garbage_collection(self):
565 # FileIO objects are collected, and collecting them flushes
566 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000567 with support.check_warnings(('', ResourceWarning)):
568 f = self.FileIO(support.TESTFN, "wb")
569 f.write(b"abcxxx")
570 f.f = f
571 wr = weakref.ref(f)
572 del f
573 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000574 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000575 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000576 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000577
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000578 def test_unbounded_file(self):
579 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
580 zero = "/dev/zero"
581 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000582 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000583 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000584 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000585 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000586 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000587 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000588 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000589 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000590 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000591 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000592 self.assertRaises(OverflowError, f.read)
593
Antoine Pitrou6be88762010-05-03 16:48:20 +0000594 def test_flush_error_on_close(self):
595 f = self.open(support.TESTFN, "wb", buffering=0)
596 def bad_flush():
597 raise IOError()
598 f.flush = bad_flush
599 self.assertRaises(IOError, f.close) # exception not swallowed
600
601 def test_multi_close(self):
602 f = self.open(support.TESTFN, "wb", buffering=0)
603 f.close()
604 f.close()
605 f.close()
606 self.assertRaises(ValueError, f.flush)
607
Antoine Pitrou328ec742010-09-14 18:37:24 +0000608 def test_RawIOBase_read(self):
609 # Exercise the default RawIOBase.read() implementation (which calls
610 # readinto() internally).
611 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
612 self.assertEqual(rawio.read(2), b"ab")
613 self.assertEqual(rawio.read(2), b"c")
614 self.assertEqual(rawio.read(2), b"d")
615 self.assertEqual(rawio.read(2), None)
616 self.assertEqual(rawio.read(2), b"ef")
617 self.assertEqual(rawio.read(2), b"g")
618 self.assertEqual(rawio.read(2), None)
619 self.assertEqual(rawio.read(2), b"")
620
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400621 def test_types_have_dict(self):
622 test = (
623 self.IOBase(),
624 self.RawIOBase(),
625 self.TextIOBase(),
626 self.StringIO(),
627 self.BytesIO()
628 )
629 for obj in test:
630 self.assertTrue(hasattr(obj, "__dict__"))
631
Ross Lagerwall59142db2011-10-31 20:34:46 +0200632 def test_opener(self):
633 with self.open(support.TESTFN, "w") as f:
634 f.write("egg\n")
635 fd = os.open(support.TESTFN, os.O_RDONLY)
636 def opener(path, flags):
637 return fd
638 with self.open("non-existent", "r", opener=opener) as f:
639 self.assertEqual(f.read(), "egg\n")
640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000641class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200642
643 def test_IOBase_finalize(self):
644 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
645 # class which inherits IOBase and an object of this class are caught
646 # in a reference cycle and close() is already in the method cache.
647 class MyIO(self.IOBase):
648 def close(self):
649 pass
650
651 # create an instance to populate the method cache
652 MyIO()
653 obj = MyIO()
654 obj.obj = obj
655 wr = weakref.ref(obj)
656 del MyIO
657 del obj
658 support.gc_collect()
659 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661class PyIOTest(IOTest):
662 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000663
Guido van Rossuma9e20242007-03-08 00:43:48 +0000664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665class CommonBufferedTests:
666 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
667
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000668 def test_detach(self):
669 raw = self.MockRawIO()
670 buf = self.tp(raw)
671 self.assertIs(buf.detach(), raw)
672 self.assertRaises(ValueError, buf.detach)
673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674 def test_fileno(self):
675 rawio = self.MockRawIO()
676 bufio = self.tp(rawio)
677
Ezio Melottib3aedd42010-11-20 19:04:17 +0000678 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000679
680 def test_no_fileno(self):
681 # XXX will we always have fileno() function? If so, kill
682 # this test. Else, write it.
683 pass
684
685 def test_invalid_args(self):
686 rawio = self.MockRawIO()
687 bufio = self.tp(rawio)
688 # Invalid whence
689 self.assertRaises(ValueError, bufio.seek, 0, -1)
690 self.assertRaises(ValueError, bufio.seek, 0, 3)
691
692 def test_override_destructor(self):
693 tp = self.tp
694 record = []
695 class MyBufferedIO(tp):
696 def __del__(self):
697 record.append(1)
698 try:
699 f = super().__del__
700 except AttributeError:
701 pass
702 else:
703 f()
704 def close(self):
705 record.append(2)
706 super().close()
707 def flush(self):
708 record.append(3)
709 super().flush()
710 rawio = self.MockRawIO()
711 bufio = MyBufferedIO(rawio)
712 writable = bufio.writable()
713 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000714 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 if writable:
716 self.assertEqual(record, [1, 2, 3])
717 else:
718 self.assertEqual(record, [1, 2])
719
720 def test_context_manager(self):
721 # Test usability as a context manager
722 rawio = self.MockRawIO()
723 bufio = self.tp(rawio)
724 def _with():
725 with bufio:
726 pass
727 _with()
728 # bufio should now be closed, and using it a second time should raise
729 # a ValueError.
730 self.assertRaises(ValueError, _with)
731
732 def test_error_through_destructor(self):
733 # Test that the exception state is not modified by a destructor,
734 # even if close() fails.
735 rawio = self.CloseFailureIO()
736 def f():
737 self.tp(rawio).xyzzy
738 with support.captured_output("stderr") as s:
739 self.assertRaises(AttributeError, f)
740 s = s.getvalue().strip()
741 if s:
742 # The destructor *may* have printed an unraisable error, check it
743 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000744 self.assertTrue(s.startswith("Exception IOError: "), s)
745 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000746
Antoine Pitrou716c4442009-05-23 19:04:03 +0000747 def test_repr(self):
748 raw = self.MockRawIO()
749 b = self.tp(raw)
750 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
751 self.assertEqual(repr(b), "<%s>" % clsname)
752 raw.name = "dummy"
753 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
754 raw.name = b"dummy"
755 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
756
Antoine Pitrou6be88762010-05-03 16:48:20 +0000757 def test_flush_error_on_close(self):
758 raw = self.MockRawIO()
759 def bad_flush():
760 raise IOError()
761 raw.flush = bad_flush
762 b = self.tp(raw)
763 self.assertRaises(IOError, b.close) # exception not swallowed
764
765 def test_multi_close(self):
766 raw = self.MockRawIO()
767 b = self.tp(raw)
768 b.close()
769 b.close()
770 b.close()
771 self.assertRaises(ValueError, b.flush)
772
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000773 def test_unseekable(self):
774 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
775 self.assertRaises(self.UnsupportedOperation, bufio.tell)
776 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
777
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000778 def test_readonly_attributes(self):
779 raw = self.MockRawIO()
780 buf = self.tp(raw)
781 x = self.MockRawIO()
782 with self.assertRaises(AttributeError):
783 buf.raw = x
784
Guido van Rossum78892e42007-04-06 17:31:18 +0000785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
787 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 def test_constructor(self):
790 rawio = self.MockRawIO([b"abc"])
791 bufio = self.tp(rawio)
792 bufio.__init__(rawio)
793 bufio.__init__(rawio, buffer_size=1024)
794 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000795 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000796 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
797 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
798 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
799 rawio = self.MockRawIO([b"abc"])
800 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000801 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000804 for arg in (None, 7):
805 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
806 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000807 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 # Invalid args
809 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000810
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000811 def test_read1(self):
812 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
813 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000814 self.assertEqual(b"a", bufio.read(1))
815 self.assertEqual(b"b", bufio.read1(1))
816 self.assertEqual(rawio._reads, 1)
817 self.assertEqual(b"c", bufio.read1(100))
818 self.assertEqual(rawio._reads, 1)
819 self.assertEqual(b"d", bufio.read1(100))
820 self.assertEqual(rawio._reads, 2)
821 self.assertEqual(b"efg", bufio.read1(100))
822 self.assertEqual(rawio._reads, 3)
823 self.assertEqual(b"", bufio.read1(100))
824 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000825 # Invalid args
826 self.assertRaises(ValueError, bufio.read1, -1)
827
828 def test_readinto(self):
829 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
830 bufio = self.tp(rawio)
831 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000832 self.assertEqual(bufio.readinto(b), 2)
833 self.assertEqual(b, b"ab")
834 self.assertEqual(bufio.readinto(b), 2)
835 self.assertEqual(b, b"cd")
836 self.assertEqual(bufio.readinto(b), 2)
837 self.assertEqual(b, b"ef")
838 self.assertEqual(bufio.readinto(b), 1)
839 self.assertEqual(b, b"gf")
840 self.assertEqual(bufio.readinto(b), 0)
841 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200842 rawio = self.MockRawIO((b"abc", None))
843 bufio = self.tp(rawio)
844 self.assertEqual(bufio.readinto(b), 2)
845 self.assertEqual(b, b"ab")
846 self.assertEqual(bufio.readinto(b), 1)
847 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000849 def test_readlines(self):
850 def bufio():
851 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
852 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000853 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
854 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
855 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000856
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000857 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000858 data = b"abcdefghi"
859 dlen = len(data)
860
861 tests = [
862 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
863 [ 100, [ 3, 3, 3], [ dlen ] ],
864 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
865 ]
866
867 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000868 rawio = self.MockFileIO(data)
869 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000870 pos = 0
871 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000872 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000873 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000874 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000875 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000876
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000877 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000878 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
880 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000881 self.assertEqual(b"abcd", bufio.read(6))
882 self.assertEqual(b"e", bufio.read(1))
883 self.assertEqual(b"fg", bufio.read())
884 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200885 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000886 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000887
Victor Stinnera80987f2011-05-25 22:47:16 +0200888 rawio = self.MockRawIO((b"a", None, None))
889 self.assertEqual(b"a", rawio.readall())
890 self.assertIsNone(rawio.readall())
891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 def test_read_past_eof(self):
893 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
894 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000895
Ezio Melottib3aedd42010-11-20 19:04:17 +0000896 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898 def test_read_all(self):
899 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
900 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000901
Ezio Melottib3aedd42010-11-20 19:04:17 +0000902 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000903
Victor Stinner45df8202010-04-28 22:31:17 +0000904 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000905 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000907 try:
908 # Write out many bytes with exactly the same number of 0's,
909 # 1's... 255's. This will help us check that concurrent reading
910 # doesn't duplicate or forget contents.
911 N = 1000
912 l = list(range(256)) * N
913 random.shuffle(l)
914 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000915 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000916 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000917 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000918 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000919 errors = []
920 results = []
921 def f():
922 try:
923 # Intra-buffer read then buffer-flushing read
924 for n in cycle([1, 19]):
925 s = bufio.read(n)
926 if not s:
927 break
928 # list.append() is atomic
929 results.append(s)
930 except Exception as e:
931 errors.append(e)
932 raise
933 threads = [threading.Thread(target=f) for x in range(20)]
934 for t in threads:
935 t.start()
936 time.sleep(0.02) # yield
937 for t in threads:
938 t.join()
939 self.assertFalse(errors,
940 "the following exceptions were caught: %r" % errors)
941 s = b''.join(results)
942 for i in range(256):
943 c = bytes(bytearray([i]))
944 self.assertEqual(s.count(c), N)
945 finally:
946 support.unlink(support.TESTFN)
947
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200948 def test_unseekable(self):
949 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
950 self.assertRaises(self.UnsupportedOperation, bufio.tell)
951 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
952 bufio.read(1)
953 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
954 self.assertRaises(self.UnsupportedOperation, bufio.tell)
955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000956 def test_misbehaved_io(self):
957 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
958 bufio = self.tp(rawio)
959 self.assertRaises(IOError, bufio.seek, 0)
960 self.assertRaises(IOError, bufio.tell)
961
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000962 def test_no_extraneous_read(self):
963 # Issue #9550; when the raw IO object has satisfied the read request,
964 # we should not issue any additional reads, otherwise it may block
965 # (e.g. socket).
966 bufsize = 16
967 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
968 rawio = self.MockRawIO([b"x" * n])
969 bufio = self.tp(rawio, bufsize)
970 self.assertEqual(bufio.read(n), b"x" * n)
971 # Simple case: one raw read is enough to satisfy the request.
972 self.assertEqual(rawio._extraneous_reads, 0,
973 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
974 # A more complex case where two raw reads are needed to satisfy
975 # the request.
976 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
977 bufio = self.tp(rawio, bufsize)
978 self.assertEqual(bufio.read(n), b"x" * n)
979 self.assertEqual(rawio._extraneous_reads, 0,
980 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
981
982
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000983class CBufferedReaderTest(BufferedReaderTest):
984 tp = io.BufferedReader
985
986 def test_constructor(self):
987 BufferedReaderTest.test_constructor(self)
988 # The allocation can succeed on 32-bit builds, e.g. with more
989 # than 2GB RAM and a 64-bit kernel.
990 if sys.maxsize > 0x7FFFFFFF:
991 rawio = self.MockRawIO()
992 bufio = self.tp(rawio)
993 self.assertRaises((OverflowError, MemoryError, ValueError),
994 bufio.__init__, rawio, sys.maxsize)
995
996 def test_initialization(self):
997 rawio = self.MockRawIO([b"abc"])
998 bufio = self.tp(rawio)
999 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1000 self.assertRaises(ValueError, bufio.read)
1001 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1002 self.assertRaises(ValueError, bufio.read)
1003 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1004 self.assertRaises(ValueError, bufio.read)
1005
1006 def test_misbehaved_io_read(self):
1007 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1008 bufio = self.tp(rawio)
1009 # _pyio.BufferedReader seems to implement reading different, so that
1010 # checking this is not so easy.
1011 self.assertRaises(IOError, bufio.read, 10)
1012
1013 def test_garbage_collection(self):
1014 # C BufferedReader objects are collected.
1015 # The Python version has __del__, so it ends into gc.garbage instead
1016 rawio = self.FileIO(support.TESTFN, "w+b")
1017 f = self.tp(rawio)
1018 f.f = f
1019 wr = weakref.ref(f)
1020 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001021 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001022 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001023
1024class PyBufferedReaderTest(BufferedReaderTest):
1025 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001026
Guido van Rossuma9e20242007-03-08 00:43:48 +00001027
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1029 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_constructor(self):
1032 rawio = self.MockRawIO()
1033 bufio = self.tp(rawio)
1034 bufio.__init__(rawio)
1035 bufio.__init__(rawio, buffer_size=1024)
1036 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001037 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001038 bufio.flush()
1039 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1040 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1041 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1042 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001043 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001045 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001047 def test_detach_flush(self):
1048 raw = self.MockRawIO()
1049 buf = self.tp(raw)
1050 buf.write(b"howdy!")
1051 self.assertFalse(raw._write_stack)
1052 buf.detach()
1053 self.assertEqual(raw._write_stack, [b"howdy!"])
1054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001056 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 writer = self.MockRawIO()
1058 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001059 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001060 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062 def test_write_overflow(self):
1063 writer = self.MockRawIO()
1064 bufio = self.tp(writer, 8)
1065 contents = b"abcdefghijklmnop"
1066 for n in range(0, len(contents), 3):
1067 bufio.write(contents[n:n+3])
1068 flushed = b"".join(writer._write_stack)
1069 # At least (total - 8) bytes were implicitly flushed, perhaps more
1070 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001071 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 def check_writes(self, intermediate_func):
1074 # Lots of writes, test the flushed output is as expected.
1075 contents = bytes(range(256)) * 1000
1076 n = 0
1077 writer = self.MockRawIO()
1078 bufio = self.tp(writer, 13)
1079 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1080 def gen_sizes():
1081 for size in count(1):
1082 for i in range(15):
1083 yield size
1084 sizes = gen_sizes()
1085 while n < len(contents):
1086 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001087 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 intermediate_func(bufio)
1089 n += size
1090 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001091 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 def test_writes(self):
1094 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 def test_writes_and_flushes(self):
1097 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001098
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 def test_writes_and_seeks(self):
1100 def _seekabs(bufio):
1101 pos = bufio.tell()
1102 bufio.seek(pos + 1, 0)
1103 bufio.seek(pos - 1, 0)
1104 bufio.seek(pos, 0)
1105 self.check_writes(_seekabs)
1106 def _seekrel(bufio):
1107 pos = bufio.seek(0, 1)
1108 bufio.seek(+1, 1)
1109 bufio.seek(-1, 1)
1110 bufio.seek(pos, 0)
1111 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001112
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 def test_writes_and_truncates(self):
1114 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001115
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 def test_write_non_blocking(self):
1117 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001118 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001119
Ezio Melottib3aedd42010-11-20 19:04:17 +00001120 self.assertEqual(bufio.write(b"abcd"), 4)
1121 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001122 # 1 byte will be written, the rest will be buffered
1123 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001124 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1127 raw.block_on(b"0")
1128 try:
1129 bufio.write(b"opqrwxyz0123456789")
1130 except self.BlockingIOError as e:
1131 written = e.characters_written
1132 else:
1133 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001134 self.assertEqual(written, 16)
1135 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001136 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001137
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 s = raw.pop_written()
1140 # Previously buffered bytes were flushed
1141 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 def test_write_and_rewind(self):
1144 raw = io.BytesIO()
1145 bufio = self.tp(raw, 4)
1146 self.assertEqual(bufio.write(b"abcdef"), 6)
1147 self.assertEqual(bufio.tell(), 6)
1148 bufio.seek(0, 0)
1149 self.assertEqual(bufio.write(b"XY"), 2)
1150 bufio.seek(6, 0)
1151 self.assertEqual(raw.getvalue(), b"XYcdef")
1152 self.assertEqual(bufio.write(b"123456"), 6)
1153 bufio.flush()
1154 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 def test_flush(self):
1157 writer = self.MockRawIO()
1158 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001159 bufio.write(b"abc")
1160 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001161 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001163 def test_destructor(self):
1164 writer = self.MockRawIO()
1165 bufio = self.tp(writer, 8)
1166 bufio.write(b"abc")
1167 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001168 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001169 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170
1171 def test_truncate(self):
1172 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001173 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001174 bufio = self.tp(raw, 8)
1175 bufio.write(b"abcdef")
1176 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001177 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001178 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001179 self.assertEqual(f.read(), b"abc")
1180
Victor Stinner45df8202010-04-28 22:31:17 +00001181 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001182 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001183 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001184 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001185 # Write out many bytes from many threads and test they were
1186 # all flushed.
1187 N = 1000
1188 contents = bytes(range(256)) * N
1189 sizes = cycle([1, 19])
1190 n = 0
1191 queue = deque()
1192 while n < len(contents):
1193 size = next(sizes)
1194 queue.append(contents[n:n+size])
1195 n += size
1196 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001197 # We use a real file object because it allows us to
1198 # exercise situations where the GIL is released before
1199 # writing the buffer to the raw streams. This is in addition
1200 # to concurrency issues due to switching threads in the middle
1201 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001202 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001204 errors = []
1205 def f():
1206 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 while True:
1208 try:
1209 s = queue.popleft()
1210 except IndexError:
1211 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001212 bufio.write(s)
1213 except Exception as e:
1214 errors.append(e)
1215 raise
1216 threads = [threading.Thread(target=f) for x in range(20)]
1217 for t in threads:
1218 t.start()
1219 time.sleep(0.02) # yield
1220 for t in threads:
1221 t.join()
1222 self.assertFalse(errors,
1223 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001225 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001226 s = f.read()
1227 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001228 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001229 finally:
1230 support.unlink(support.TESTFN)
1231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001232 def test_misbehaved_io(self):
1233 rawio = self.MisbehavedRawIO()
1234 bufio = self.tp(rawio, 5)
1235 self.assertRaises(IOError, bufio.seek, 0)
1236 self.assertRaises(IOError, bufio.tell)
1237 self.assertRaises(IOError, bufio.write, b"abcdef")
1238
Benjamin Peterson59406a92009-03-26 17:10:29 +00001239 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001240 with support.check_warnings(("max_buffer_size is deprecated",
1241 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001242 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001243
1244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245class CBufferedWriterTest(BufferedWriterTest):
1246 tp = io.BufferedWriter
1247
1248 def test_constructor(self):
1249 BufferedWriterTest.test_constructor(self)
1250 # The allocation can succeed on 32-bit builds, e.g. with more
1251 # than 2GB RAM and a 64-bit kernel.
1252 if sys.maxsize > 0x7FFFFFFF:
1253 rawio = self.MockRawIO()
1254 bufio = self.tp(rawio)
1255 self.assertRaises((OverflowError, MemoryError, ValueError),
1256 bufio.__init__, rawio, sys.maxsize)
1257
1258 def test_initialization(self):
1259 rawio = self.MockRawIO()
1260 bufio = self.tp(rawio)
1261 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1262 self.assertRaises(ValueError, bufio.write, b"def")
1263 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1264 self.assertRaises(ValueError, bufio.write, b"def")
1265 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1266 self.assertRaises(ValueError, bufio.write, b"def")
1267
1268 def test_garbage_collection(self):
1269 # C BufferedWriter objects are collected, and collecting them flushes
1270 # all data to disk.
1271 # The Python version has __del__, so it ends into gc.garbage instead
1272 rawio = self.FileIO(support.TESTFN, "w+b")
1273 f = self.tp(rawio)
1274 f.write(b"123xxx")
1275 f.x = f
1276 wr = weakref.ref(f)
1277 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001278 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001279 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001280 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001281 self.assertEqual(f.read(), b"123xxx")
1282
1283
1284class PyBufferedWriterTest(BufferedWriterTest):
1285 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001286
Guido van Rossum01a27522007-03-07 01:00:12 +00001287class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001288
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001289 def test_constructor(self):
1290 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001291 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001292
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001293 def test_detach(self):
1294 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1295 self.assertRaises(self.UnsupportedOperation, pair.detach)
1296
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001297 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001298 with support.check_warnings(("max_buffer_size is deprecated",
1299 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001300 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001301
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001302 def test_constructor_with_not_readable(self):
1303 class NotReadable(MockRawIO):
1304 def readable(self):
1305 return False
1306
1307 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1308
1309 def test_constructor_with_not_writeable(self):
1310 class NotWriteable(MockRawIO):
1311 def writable(self):
1312 return False
1313
1314 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1315
1316 def test_read(self):
1317 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1318
1319 self.assertEqual(pair.read(3), b"abc")
1320 self.assertEqual(pair.read(1), b"d")
1321 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001322 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1323 self.assertEqual(pair.read(None), b"abc")
1324
1325 def test_readlines(self):
1326 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1327 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1328 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1329 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001330
1331 def test_read1(self):
1332 # .read1() is delegated to the underlying reader object, so this test
1333 # can be shallow.
1334 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1335
1336 self.assertEqual(pair.read1(3), b"abc")
1337
1338 def test_readinto(self):
1339 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1340
1341 data = bytearray(5)
1342 self.assertEqual(pair.readinto(data), 5)
1343 self.assertEqual(data, b"abcde")
1344
1345 def test_write(self):
1346 w = self.MockRawIO()
1347 pair = self.tp(self.MockRawIO(), w)
1348
1349 pair.write(b"abc")
1350 pair.flush()
1351 pair.write(b"def")
1352 pair.flush()
1353 self.assertEqual(w._write_stack, [b"abc", b"def"])
1354
1355 def test_peek(self):
1356 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1357
1358 self.assertTrue(pair.peek(3).startswith(b"abc"))
1359 self.assertEqual(pair.read(3), b"abc")
1360
1361 def test_readable(self):
1362 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1363 self.assertTrue(pair.readable())
1364
1365 def test_writeable(self):
1366 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1367 self.assertTrue(pair.writable())
1368
1369 def test_seekable(self):
1370 # BufferedRWPairs are never seekable, even if their readers and writers
1371 # are.
1372 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1373 self.assertFalse(pair.seekable())
1374
1375 # .flush() is delegated to the underlying writer object and has been
1376 # tested in the test_write method.
1377
1378 def test_close_and_closed(self):
1379 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1380 self.assertFalse(pair.closed)
1381 pair.close()
1382 self.assertTrue(pair.closed)
1383
1384 def test_isatty(self):
1385 class SelectableIsAtty(MockRawIO):
1386 def __init__(self, isatty):
1387 MockRawIO.__init__(self)
1388 self._isatty = isatty
1389
1390 def isatty(self):
1391 return self._isatty
1392
1393 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1394 self.assertFalse(pair.isatty())
1395
1396 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1397 self.assertTrue(pair.isatty())
1398
1399 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1400 self.assertTrue(pair.isatty())
1401
1402 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1403 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001404
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405class CBufferedRWPairTest(BufferedRWPairTest):
1406 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001407
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001408class PyBufferedRWPairTest(BufferedRWPairTest):
1409 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001410
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411
1412class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1413 read_mode = "rb+"
1414 write_mode = "wb+"
1415
1416 def test_constructor(self):
1417 BufferedReaderTest.test_constructor(self)
1418 BufferedWriterTest.test_constructor(self)
1419
1420 def test_read_and_write(self):
1421 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001422 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001423
1424 self.assertEqual(b"as", rw.read(2))
1425 rw.write(b"ddd")
1426 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001427 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001429 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001430
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 def test_seek_and_tell(self):
1432 raw = self.BytesIO(b"asdfghjkl")
1433 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001434
Ezio Melottib3aedd42010-11-20 19:04:17 +00001435 self.assertEqual(b"as", rw.read(2))
1436 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001437 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001438 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001439
Antoine Pitroue05565e2011-08-20 14:39:23 +02001440 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001441 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001442 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001443 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001444 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001445 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001446 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001447 self.assertEqual(7, rw.tell())
1448 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001449 rw.flush()
1450 self.assertEqual(b"asdf123fl", raw.getvalue())
1451
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001452 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001453
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454 def check_flush_and_read(self, read_func):
1455 raw = self.BytesIO(b"abcdefghi")
1456 bufio = self.tp(raw)
1457
Ezio Melottib3aedd42010-11-20 19:04:17 +00001458 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001460 self.assertEqual(b"ef", read_func(bufio, 2))
1461 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001463 self.assertEqual(6, bufio.tell())
1464 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001465 raw.seek(0, 0)
1466 raw.write(b"XYZ")
1467 # flush() resets the read buffer
1468 bufio.flush()
1469 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001470 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471
1472 def test_flush_and_read(self):
1473 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1474
1475 def test_flush_and_readinto(self):
1476 def _readinto(bufio, n=-1):
1477 b = bytearray(n if n >= 0 else 9999)
1478 n = bufio.readinto(b)
1479 return bytes(b[:n])
1480 self.check_flush_and_read(_readinto)
1481
1482 def test_flush_and_peek(self):
1483 def _peek(bufio, n=-1):
1484 # This relies on the fact that the buffer can contain the whole
1485 # raw stream, otherwise peek() can return less.
1486 b = bufio.peek(n)
1487 if n != -1:
1488 b = b[:n]
1489 bufio.seek(len(b), 1)
1490 return b
1491 self.check_flush_and_read(_peek)
1492
1493 def test_flush_and_write(self):
1494 raw = self.BytesIO(b"abcdefghi")
1495 bufio = self.tp(raw)
1496
1497 bufio.write(b"123")
1498 bufio.flush()
1499 bufio.write(b"45")
1500 bufio.flush()
1501 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001502 self.assertEqual(b"12345fghi", raw.getvalue())
1503 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001504
1505 def test_threads(self):
1506 BufferedReaderTest.test_threads(self)
1507 BufferedWriterTest.test_threads(self)
1508
1509 def test_writes_and_peek(self):
1510 def _peek(bufio):
1511 bufio.peek(1)
1512 self.check_writes(_peek)
1513 def _peek(bufio):
1514 pos = bufio.tell()
1515 bufio.seek(-1, 1)
1516 bufio.peek(1)
1517 bufio.seek(pos, 0)
1518 self.check_writes(_peek)
1519
1520 def test_writes_and_reads(self):
1521 def _read(bufio):
1522 bufio.seek(-1, 1)
1523 bufio.read(1)
1524 self.check_writes(_read)
1525
1526 def test_writes_and_read1s(self):
1527 def _read1(bufio):
1528 bufio.seek(-1, 1)
1529 bufio.read1(1)
1530 self.check_writes(_read1)
1531
1532 def test_writes_and_readintos(self):
1533 def _read(bufio):
1534 bufio.seek(-1, 1)
1535 bufio.readinto(bytearray(1))
1536 self.check_writes(_read)
1537
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001538 def test_write_after_readahead(self):
1539 # Issue #6629: writing after the buffer was filled by readahead should
1540 # first rewind the raw stream.
1541 for overwrite_size in [1, 5]:
1542 raw = self.BytesIO(b"A" * 10)
1543 bufio = self.tp(raw, 4)
1544 # Trigger readahead
1545 self.assertEqual(bufio.read(1), b"A")
1546 self.assertEqual(bufio.tell(), 1)
1547 # Overwriting should rewind the raw stream if it needs so
1548 bufio.write(b"B" * overwrite_size)
1549 self.assertEqual(bufio.tell(), overwrite_size + 1)
1550 # If the write size was smaller than the buffer size, flush() and
1551 # check that rewind happens.
1552 bufio.flush()
1553 self.assertEqual(bufio.tell(), overwrite_size + 1)
1554 s = raw.getvalue()
1555 self.assertEqual(s,
1556 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1557
Antoine Pitrou7c404892011-05-13 00:13:33 +02001558 def test_write_rewind_write(self):
1559 # Various combinations of reading / writing / seeking backwards / writing again
1560 def mutate(bufio, pos1, pos2):
1561 assert pos2 >= pos1
1562 # Fill the buffer
1563 bufio.seek(pos1)
1564 bufio.read(pos2 - pos1)
1565 bufio.write(b'\x02')
1566 # This writes earlier than the previous write, but still inside
1567 # the buffer.
1568 bufio.seek(pos1)
1569 bufio.write(b'\x01')
1570
1571 b = b"\x80\x81\x82\x83\x84"
1572 for i in range(0, len(b)):
1573 for j in range(i, len(b)):
1574 raw = self.BytesIO(b)
1575 bufio = self.tp(raw, 100)
1576 mutate(bufio, i, j)
1577 bufio.flush()
1578 expected = bytearray(b)
1579 expected[j] = 2
1580 expected[i] = 1
1581 self.assertEqual(raw.getvalue(), expected,
1582 "failed result for i=%d, j=%d" % (i, j))
1583
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001584 def test_truncate_after_read_or_write(self):
1585 raw = self.BytesIO(b"A" * 10)
1586 bufio = self.tp(raw, 100)
1587 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1588 self.assertEqual(bufio.truncate(), 2)
1589 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1590 self.assertEqual(bufio.truncate(), 4)
1591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001592 def test_misbehaved_io(self):
1593 BufferedReaderTest.test_misbehaved_io(self)
1594 BufferedWriterTest.test_misbehaved_io(self)
1595
Antoine Pitroue05565e2011-08-20 14:39:23 +02001596 def test_interleaved_read_write(self):
1597 # Test for issue #12213
1598 with self.BytesIO(b'abcdefgh') as raw:
1599 with self.tp(raw, 100) as f:
1600 f.write(b"1")
1601 self.assertEqual(f.read(1), b'b')
1602 f.write(b'2')
1603 self.assertEqual(f.read1(1), b'd')
1604 f.write(b'3')
1605 buf = bytearray(1)
1606 f.readinto(buf)
1607 self.assertEqual(buf, b'f')
1608 f.write(b'4')
1609 self.assertEqual(f.peek(1), b'h')
1610 f.flush()
1611 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1612
1613 with self.BytesIO(b'abc') as raw:
1614 with self.tp(raw, 100) as f:
1615 self.assertEqual(f.read(1), b'a')
1616 f.write(b"2")
1617 self.assertEqual(f.read(1), b'c')
1618 f.flush()
1619 self.assertEqual(raw.getvalue(), b'a2c')
1620
1621 def test_interleaved_readline_write(self):
1622 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1623 with self.tp(raw) as f:
1624 f.write(b'1')
1625 self.assertEqual(f.readline(), b'b\n')
1626 f.write(b'2')
1627 self.assertEqual(f.readline(), b'def\n')
1628 f.write(b'3')
1629 self.assertEqual(f.readline(), b'\n')
1630 f.flush()
1631 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1632
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001633 # You can't construct a BufferedRandom over a non-seekable stream.
1634 test_unseekable = None
1635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636class CBufferedRandomTest(BufferedRandomTest):
1637 tp = io.BufferedRandom
1638
1639 def test_constructor(self):
1640 BufferedRandomTest.test_constructor(self)
1641 # The allocation can succeed on 32-bit builds, e.g. with more
1642 # than 2GB RAM and a 64-bit kernel.
1643 if sys.maxsize > 0x7FFFFFFF:
1644 rawio = self.MockRawIO()
1645 bufio = self.tp(rawio)
1646 self.assertRaises((OverflowError, MemoryError, ValueError),
1647 bufio.__init__, rawio, sys.maxsize)
1648
1649 def test_garbage_collection(self):
1650 CBufferedReaderTest.test_garbage_collection(self)
1651 CBufferedWriterTest.test_garbage_collection(self)
1652
1653class PyBufferedRandomTest(BufferedRandomTest):
1654 tp = pyio.BufferedRandom
1655
1656
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001657# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1658# properties:
1659# - A single output character can correspond to many bytes of input.
1660# - The number of input bytes to complete the character can be
1661# undetermined until the last input byte is received.
1662# - The number of input bytes can vary depending on previous input.
1663# - A single input byte can correspond to many characters of output.
1664# - The number of output characters can be undetermined until the
1665# last input byte is received.
1666# - The number of output characters can vary depending on previous input.
1667
1668class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1669 """
1670 For testing seek/tell behavior with a stateful, buffering decoder.
1671
1672 Input is a sequence of words. Words may be fixed-length (length set
1673 by input) or variable-length (period-terminated). In variable-length
1674 mode, extra periods are ignored. Possible words are:
1675 - 'i' followed by a number sets the input length, I (maximum 99).
1676 When I is set to 0, words are space-terminated.
1677 - 'o' followed by a number sets the output length, O (maximum 99).
1678 - Any other word is converted into a word followed by a period on
1679 the output. The output word consists of the input word truncated
1680 or padded out with hyphens to make its length equal to O. If O
1681 is 0, the word is output verbatim without truncating or padding.
1682 I and O are initially set to 1. When I changes, any buffered input is
1683 re-scanned according to the new I. EOF also terminates the last word.
1684 """
1685
1686 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001687 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001688 self.reset()
1689
1690 def __repr__(self):
1691 return '<SID %x>' % id(self)
1692
1693 def reset(self):
1694 self.i = 1
1695 self.o = 1
1696 self.buffer = bytearray()
1697
1698 def getstate(self):
1699 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1700 return bytes(self.buffer), i*100 + o
1701
1702 def setstate(self, state):
1703 buffer, io = state
1704 self.buffer = bytearray(buffer)
1705 i, o = divmod(io, 100)
1706 self.i, self.o = i ^ 1, o ^ 1
1707
1708 def decode(self, input, final=False):
1709 output = ''
1710 for b in input:
1711 if self.i == 0: # variable-length, terminated with period
1712 if b == ord('.'):
1713 if self.buffer:
1714 output += self.process_word()
1715 else:
1716 self.buffer.append(b)
1717 else: # fixed-length, terminate after self.i bytes
1718 self.buffer.append(b)
1719 if len(self.buffer) == self.i:
1720 output += self.process_word()
1721 if final and self.buffer: # EOF terminates the last word
1722 output += self.process_word()
1723 return output
1724
1725 def process_word(self):
1726 output = ''
1727 if self.buffer[0] == ord('i'):
1728 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1729 elif self.buffer[0] == ord('o'):
1730 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1731 else:
1732 output = self.buffer.decode('ascii')
1733 if len(output) < self.o:
1734 output += '-'*self.o # pad out with hyphens
1735 if self.o:
1736 output = output[:self.o] # truncate to output length
1737 output += '.'
1738 self.buffer = bytearray()
1739 return output
1740
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001741 codecEnabled = False
1742
1743 @classmethod
1744 def lookupTestDecoder(cls, name):
1745 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001746 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001747 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001748 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001749 incrementalencoder=None,
1750 streamreader=None, streamwriter=None,
1751 incrementaldecoder=cls)
1752
1753# Register the previous decoder for testing.
1754# Disabled by default, tests will enable it.
1755codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1756
1757
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001758class StatefulIncrementalDecoderTest(unittest.TestCase):
1759 """
1760 Make sure the StatefulIncrementalDecoder actually works.
1761 """
1762
1763 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001764 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001765 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001766 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001767 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001768 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001769 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001770 # I=0, O=6 (variable-length input, fixed-length output)
1771 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1772 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001773 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001774 # I=6, O=3 (fixed-length input > fixed-length output)
1775 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1776 # I=0, then 3; O=29, then 15 (with longer output)
1777 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1778 'a----------------------------.' +
1779 'b----------------------------.' +
1780 'cde--------------------------.' +
1781 'abcdefghijabcde.' +
1782 'a.b------------.' +
1783 '.c.------------.' +
1784 'd.e------------.' +
1785 'k--------------.' +
1786 'l--------------.' +
1787 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001788 ]
1789
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001791 # Try a few one-shot test cases.
1792 for input, eof, output in self.test_cases:
1793 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001794 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001795
1796 # Also test an unfinished decode, followed by forcing EOF.
1797 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001798 self.assertEqual(d.decode(b'oiabcd'), '')
1799 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001800
1801class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001802
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001803 def setUp(self):
1804 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1805 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001806 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001807
Guido van Rossumd0712812007-04-11 16:32:43 +00001808 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001809 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001810
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 def test_constructor(self):
1812 r = self.BytesIO(b"\xc3\xa9\n\n")
1813 b = self.BufferedReader(r, 1000)
1814 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001815 t.__init__(b, encoding="latin-1", newline="\r\n")
1816 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001817 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001818 t.__init__(b, encoding="utf-8", line_buffering=True)
1819 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001820 self.assertEqual(t.line_buffering, True)
1821 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 self.assertRaises(TypeError, t.__init__, b, newline=42)
1823 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1824
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001825 def test_detach(self):
1826 r = self.BytesIO()
1827 b = self.BufferedWriter(r)
1828 t = self.TextIOWrapper(b)
1829 self.assertIs(t.detach(), b)
1830
1831 t = self.TextIOWrapper(b, encoding="ascii")
1832 t.write("howdy")
1833 self.assertFalse(r.getvalue())
1834 t.detach()
1835 self.assertEqual(r.getvalue(), b"howdy")
1836 self.assertRaises(ValueError, t.detach)
1837
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001838 def test_repr(self):
1839 raw = self.BytesIO("hello".encode("utf-8"))
1840 b = self.BufferedReader(raw)
1841 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001842 modname = self.TextIOWrapper.__module__
1843 self.assertEqual(repr(t),
1844 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1845 raw.name = "dummy"
1846 self.assertEqual(repr(t),
1847 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001848 t.mode = "r"
1849 self.assertEqual(repr(t),
1850 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001851 raw.name = b"dummy"
1852 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001853 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001854
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001855 def test_line_buffering(self):
1856 r = self.BytesIO()
1857 b = self.BufferedWriter(r, 1000)
1858 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001859 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001860 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001861 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001862 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001863 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001864 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 def test_encoding(self):
1867 # Check the encoding attribute is always set, and valid
1868 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001869 t = self.TextIOWrapper(b, encoding="utf-8")
1870 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001872 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 codecs.lookup(t.encoding)
1874
1875 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001876 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 b = self.BytesIO(b"abc\n\xff\n")
1878 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001879 self.assertRaises(UnicodeError, t.read)
1880 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001881 b = self.BytesIO(b"abc\n\xff\n")
1882 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001883 self.assertRaises(UnicodeError, t.read)
1884 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 b = self.BytesIO(b"abc\n\xff\n")
1886 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001887 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001888 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 b = self.BytesIO(b"abc\n\xff\n")
1890 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001891 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001894 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 b = self.BytesIO()
1896 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001897 self.assertRaises(UnicodeError, t.write, "\xff")
1898 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001899 b = self.BytesIO()
1900 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001901 self.assertRaises(UnicodeError, t.write, "\xff")
1902 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001903 b = self.BytesIO()
1904 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001905 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001906 t.write("abc\xffdef\n")
1907 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001908 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001909 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 b = self.BytesIO()
1911 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001912 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001913 t.write("abc\xffdef\n")
1914 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001915 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001918 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1919
1920 tests = [
1921 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001922 [ '', input_lines ],
1923 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1924 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1925 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001926 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001927 encodings = (
1928 'utf-8', 'latin-1',
1929 'utf-16', 'utf-16-le', 'utf-16-be',
1930 'utf-32', 'utf-32-le', 'utf-32-be',
1931 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001932
Guido van Rossum8358db22007-08-18 21:39:55 +00001933 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001934 # character in TextIOWrapper._pending_line.
1935 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001936 # XXX: str.encode() should return bytes
1937 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001938 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001939 for bufsize in range(1, 10):
1940 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001941 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1942 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001943 encoding=encoding)
1944 if do_reads:
1945 got_lines = []
1946 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001947 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001948 if c2 == '':
1949 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001951 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001952 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001953 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001954
1955 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001956 self.assertEqual(got_line, exp_line)
1957 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 def test_newlines_input(self):
1960 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001961 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1962 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03001963 (None, normalized.decode("ascii").splitlines(keepends=True)),
1964 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001965 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1966 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1967 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001968 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001969 buf = self.BytesIO(testdata)
1970 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001971 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001972 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001973 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975 def test_newlines_output(self):
1976 testdict = {
1977 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1978 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1979 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1980 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1981 }
1982 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1983 for newline, expected in tests:
1984 buf = self.BytesIO()
1985 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1986 txt.write("AAA\nB")
1987 txt.write("BB\nCCC\n")
1988 txt.write("X\rY\r\nZ")
1989 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001990 self.assertEqual(buf.closed, False)
1991 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992
1993 def test_destructor(self):
1994 l = []
1995 base = self.BytesIO
1996 class MyBytesIO(base):
1997 def close(self):
1998 l.append(self.getvalue())
1999 base.close(self)
2000 b = MyBytesIO()
2001 t = self.TextIOWrapper(b, encoding="ascii")
2002 t.write("abc")
2003 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002004 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002005 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002006
2007 def test_override_destructor(self):
2008 record = []
2009 class MyTextIO(self.TextIOWrapper):
2010 def __del__(self):
2011 record.append(1)
2012 try:
2013 f = super().__del__
2014 except AttributeError:
2015 pass
2016 else:
2017 f()
2018 def close(self):
2019 record.append(2)
2020 super().close()
2021 def flush(self):
2022 record.append(3)
2023 super().flush()
2024 b = self.BytesIO()
2025 t = MyTextIO(b, encoding="ascii")
2026 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002027 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002028 self.assertEqual(record, [1, 2, 3])
2029
2030 def test_error_through_destructor(self):
2031 # Test that the exception state is not modified by a destructor,
2032 # even if close() fails.
2033 rawio = self.CloseFailureIO()
2034 def f():
2035 self.TextIOWrapper(rawio).xyzzy
2036 with support.captured_output("stderr") as s:
2037 self.assertRaises(AttributeError, f)
2038 s = s.getvalue().strip()
2039 if s:
2040 # The destructor *may* have printed an unraisable error, check it
2041 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002042 self.assertTrue(s.startswith("Exception IOError: "), s)
2043 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002044
Guido van Rossum9b76da62007-04-11 01:09:03 +00002045 # Systematic tests of the text I/O API
2046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002048 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002049 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002051 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002052 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002053 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002055 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002056 self.assertEqual(f.tell(), 0)
2057 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002058 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002059 self.assertEqual(f.seek(0), 0)
2060 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002061 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002062 self.assertEqual(f.read(2), "ab")
2063 self.assertEqual(f.read(1), "c")
2064 self.assertEqual(f.read(1), "")
2065 self.assertEqual(f.read(), "")
2066 self.assertEqual(f.tell(), cookie)
2067 self.assertEqual(f.seek(0), 0)
2068 self.assertEqual(f.seek(0, 2), cookie)
2069 self.assertEqual(f.write("def"), 3)
2070 self.assertEqual(f.seek(cookie), cookie)
2071 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002072 if enc.startswith("utf"):
2073 self.multi_line_test(f, enc)
2074 f.close()
2075
2076 def multi_line_test(self, f, enc):
2077 f.seek(0)
2078 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002079 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002080 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002081 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002082 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002083 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002084 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002085 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002086 wlines.append((f.tell(), line))
2087 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002088 f.seek(0)
2089 rlines = []
2090 while True:
2091 pos = f.tell()
2092 line = f.readline()
2093 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002094 break
2095 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002098 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002099 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002100 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002101 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002102 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002103 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002104 p2 = f.tell()
2105 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(f.tell(), p0)
2107 self.assertEqual(f.readline(), "\xff\n")
2108 self.assertEqual(f.tell(), p1)
2109 self.assertEqual(f.readline(), "\xff\n")
2110 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002111 f.seek(0)
2112 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002113 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002114 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002115 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002116 f.close()
2117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 def test_seeking(self):
2119 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002120 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002121 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002122 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002124 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002125 suffix = bytes(u_suffix.encode("utf-8"))
2126 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002127 with self.open(support.TESTFN, "wb") as f:
2128 f.write(line*2)
2129 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2130 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002131 self.assertEqual(s, str(prefix, "ascii"))
2132 self.assertEqual(f.tell(), prefix_size)
2133 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002136 # Regression test for a specific bug
2137 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002138 with self.open(support.TESTFN, "wb") as f:
2139 f.write(data)
2140 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2141 f._CHUNK_SIZE # Just test that it exists
2142 f._CHUNK_SIZE = 2
2143 f.readline()
2144 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 def test_seek_and_tell(self):
2147 #Test seek/tell using the StatefulIncrementalDecoder.
2148 # Make test faster by doing smaller seeks
2149 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002150
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002151 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002152 """Tell/seek to various points within a data stream and ensure
2153 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002155 f.write(data)
2156 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 f = self.open(support.TESTFN, encoding='test_decoder')
2158 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002159 decoded = f.read()
2160 f.close()
2161
Neal Norwitze2b07052008-03-18 19:52:05 +00002162 for i in range(min_pos, len(decoded) + 1): # seek positions
2163 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002165 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002166 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002168 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002170 f.close()
2171
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002172 # Enable the test decoder.
2173 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002174
2175 # Run the tests.
2176 try:
2177 # Try each test case.
2178 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002179 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002180
2181 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002182 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2183 offset = CHUNK_SIZE - len(input)//2
2184 prefix = b'.'*offset
2185 # Don't bother seeking into the prefix (takes too long).
2186 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002187 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002188
2189 # Ensure our test decoder won't interfere with subsequent tests.
2190 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002191 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002194 data = "1234567890"
2195 tests = ("utf-16",
2196 "utf-16-le",
2197 "utf-16-be",
2198 "utf-32",
2199 "utf-32-le",
2200 "utf-32-be")
2201 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002202 buf = self.BytesIO()
2203 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002204 # Check if the BOM is written only once (see issue1753).
2205 f.write(data)
2206 f.write(data)
2207 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002208 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002209 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002210 self.assertEqual(f.read(), data * 2)
2211 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002212
Benjamin Petersona1b49012009-03-31 23:11:32 +00002213 def test_unreadable(self):
2214 class UnReadable(self.BytesIO):
2215 def readable(self):
2216 return False
2217 txt = self.TextIOWrapper(UnReadable())
2218 self.assertRaises(IOError, txt.read)
2219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 def test_read_one_by_one(self):
2221 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002222 reads = ""
2223 while True:
2224 c = txt.read(1)
2225 if not c:
2226 break
2227 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002228 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002229
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002230 def test_readlines(self):
2231 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2232 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2233 txt.seek(0)
2234 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2235 txt.seek(0)
2236 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2237
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002238 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002239 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002240 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002241 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002242 reads = ""
2243 while True:
2244 c = txt.read(128)
2245 if not c:
2246 break
2247 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002248 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002249
2250 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002252
2253 # read one char at a time
2254 reads = ""
2255 while True:
2256 c = txt.read(1)
2257 if not c:
2258 break
2259 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002260 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002261
2262 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002264 txt._CHUNK_SIZE = 4
2265
2266 reads = ""
2267 while True:
2268 c = txt.read(4)
2269 if not c:
2270 break
2271 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002272 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002273
2274 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002275 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002276 txt._CHUNK_SIZE = 4
2277
2278 reads = txt.read(4)
2279 reads += txt.read(4)
2280 reads += txt.readline()
2281 reads += txt.readline()
2282 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002283 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002284
2285 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287 txt._CHUNK_SIZE = 4
2288
2289 reads = txt.read(4)
2290 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002291 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002292
2293 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002294 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002295 txt._CHUNK_SIZE = 4
2296
2297 reads = txt.read(4)
2298 pos = txt.tell()
2299 txt.seek(0)
2300 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002301 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002302
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002303 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002304 buffer = self.BytesIO(self.testdata)
2305 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002306
2307 self.assertEqual(buffer.seekable(), txt.seekable())
2308
Antoine Pitroue4501852009-05-14 18:55:55 +00002309 def test_append_bom(self):
2310 # The BOM is not written again when appending to a non-empty file
2311 filename = support.TESTFN
2312 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2313 with self.open(filename, 'w', encoding=charset) as f:
2314 f.write('aaa')
2315 pos = f.tell()
2316 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002317 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002318
2319 with self.open(filename, 'a', encoding=charset) as f:
2320 f.write('xxx')
2321 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002322 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002323
2324 def test_seek_bom(self):
2325 # Same test, but when seeking manually
2326 filename = support.TESTFN
2327 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2328 with self.open(filename, 'w', encoding=charset) as f:
2329 f.write('aaa')
2330 pos = f.tell()
2331 with self.open(filename, 'r+', encoding=charset) as f:
2332 f.seek(pos)
2333 f.write('zzz')
2334 f.seek(0)
2335 f.write('bbb')
2336 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002337 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002338
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002339 def test_errors_property(self):
2340 with self.open(support.TESTFN, "w") as f:
2341 self.assertEqual(f.errors, "strict")
2342 with self.open(support.TESTFN, "w", errors="replace") as f:
2343 self.assertEqual(f.errors, "replace")
2344
Brett Cannon31f59292011-02-21 19:29:56 +00002345 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002346 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002347 def test_threads_write(self):
2348 # Issue6750: concurrent writes could duplicate data
2349 event = threading.Event()
2350 with self.open(support.TESTFN, "w", buffering=1) as f:
2351 def run(n):
2352 text = "Thread%03d\n" % n
2353 event.wait()
2354 f.write(text)
2355 threads = [threading.Thread(target=lambda n=x: run(n))
2356 for x in range(20)]
2357 for t in threads:
2358 t.start()
2359 time.sleep(0.02)
2360 event.set()
2361 for t in threads:
2362 t.join()
2363 with self.open(support.TESTFN) as f:
2364 content = f.read()
2365 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002366 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002367
Antoine Pitrou6be88762010-05-03 16:48:20 +00002368 def test_flush_error_on_close(self):
2369 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2370 def bad_flush():
2371 raise IOError()
2372 txt.flush = bad_flush
2373 self.assertRaises(IOError, txt.close) # exception not swallowed
2374
2375 def test_multi_close(self):
2376 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2377 txt.close()
2378 txt.close()
2379 txt.close()
2380 self.assertRaises(ValueError, txt.flush)
2381
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002382 def test_unseekable(self):
2383 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2384 self.assertRaises(self.UnsupportedOperation, txt.tell)
2385 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2386
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002387 def test_readonly_attributes(self):
2388 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2389 buf = self.BytesIO(self.testdata)
2390 with self.assertRaises(AttributeError):
2391 txt.buffer = buf
2392
Antoine Pitroue96ec682011-07-23 21:46:35 +02002393 def test_rawio(self):
2394 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2395 # that subprocess.Popen() can have the required unbuffered
2396 # semantics with universal_newlines=True.
2397 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2398 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2399 # Reads
2400 self.assertEqual(txt.read(4), 'abcd')
2401 self.assertEqual(txt.readline(), 'efghi\n')
2402 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2403
2404 def test_rawio_write_through(self):
2405 # Issue #12591: with write_through=True, writes don't need a flush
2406 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2407 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2408 write_through=True)
2409 txt.write('1')
2410 txt.write('23\n4')
2411 txt.write('5')
2412 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2413
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002414class CTextIOWrapperTest(TextIOWrapperTest):
2415
2416 def test_initialization(self):
2417 r = self.BytesIO(b"\xc3\xa9\n\n")
2418 b = self.BufferedReader(r, 1000)
2419 t = self.TextIOWrapper(b)
2420 self.assertRaises(TypeError, t.__init__, b, newline=42)
2421 self.assertRaises(ValueError, t.read)
2422 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2423 self.assertRaises(ValueError, t.read)
2424
2425 def test_garbage_collection(self):
2426 # C TextIOWrapper objects are collected, and collecting them flushes
2427 # all data to disk.
2428 # The Python version has __del__, so it ends in gc.garbage instead.
2429 rawio = io.FileIO(support.TESTFN, "wb")
2430 b = self.BufferedWriter(rawio)
2431 t = self.TextIOWrapper(b, encoding="ascii")
2432 t.write("456def")
2433 t.x = t
2434 wr = weakref.ref(t)
2435 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002436 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002437 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002438 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439 self.assertEqual(f.read(), b"456def")
2440
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002441 def test_rwpair_cleared_before_textio(self):
2442 # Issue 13070: TextIOWrapper's finalization would crash when called
2443 # after the reference to the underlying BufferedRWPair's writer got
2444 # cleared by the GC.
2445 for i in range(1000):
2446 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2447 t1 = self.TextIOWrapper(b1, encoding="ascii")
2448 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2449 t2 = self.TextIOWrapper(b2, encoding="ascii")
2450 # circular references
2451 t1.buddy = t2
2452 t2.buddy = t1
2453 support.gc_collect()
2454
2455
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002456class PyTextIOWrapperTest(TextIOWrapperTest):
2457 pass
2458
2459
2460class IncrementalNewlineDecoderTest(unittest.TestCase):
2461
2462 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002463 # UTF-8 specific tests for a newline decoder
2464 def _check_decode(b, s, **kwargs):
2465 # We exercise getstate() / setstate() as well as decode()
2466 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002468 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002469 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002470
Antoine Pitrou180a3362008-12-14 16:36:46 +00002471 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002472
Antoine Pitrou180a3362008-12-14 16:36:46 +00002473 _check_decode(b'\xe8', "")
2474 _check_decode(b'\xa2', "")
2475 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002476
Antoine Pitrou180a3362008-12-14 16:36:46 +00002477 _check_decode(b'\xe8', "")
2478 _check_decode(b'\xa2', "")
2479 _check_decode(b'\x88', "\u8888")
2480
2481 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002482 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2483
Antoine Pitrou180a3362008-12-14 16:36:46 +00002484 decoder.reset()
2485 _check_decode(b'\n', "\n")
2486 _check_decode(b'\r', "")
2487 _check_decode(b'', "\n", final=True)
2488 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002489
Antoine Pitrou180a3362008-12-14 16:36:46 +00002490 _check_decode(b'\r', "")
2491 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002492
Antoine Pitrou180a3362008-12-14 16:36:46 +00002493 _check_decode(b'\r\r\n', "\n\n")
2494 _check_decode(b'\r', "")
2495 _check_decode(b'\r', "\n")
2496 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002497
Antoine Pitrou180a3362008-12-14 16:36:46 +00002498 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2499 _check_decode(b'\xe8\xa2\x88', "\u8888")
2500 _check_decode(b'\n', "\n")
2501 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2502 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002504 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002505 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 if encoding is not None:
2507 encoder = codecs.getincrementalencoder(encoding)()
2508 def _decode_bytewise(s):
2509 # Decode one byte at a time
2510 for b in encoder.encode(s):
2511 result.append(decoder.decode(bytes([b])))
2512 else:
2513 encoder = None
2514 def _decode_bytewise(s):
2515 # Decode one char at a time
2516 for c in s:
2517 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002519 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002521 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002522 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002523 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002524 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002527 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002528 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002529 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002530 input = "abc"
2531 if encoder is not None:
2532 encoder.reset()
2533 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002534 self.assertEqual(decoder.decode(input), "abc")
2535 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002536
2537 def test_newline_decoder(self):
2538 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002539 # None meaning the IncrementalNewlineDecoder takes unicode input
2540 # rather than bytes input
2541 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002542 'utf-16', 'utf-16-le', 'utf-16-be',
2543 'utf-32', 'utf-32-le', 'utf-32-be',
2544 )
2545 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002546 decoder = enc and codecs.getincrementaldecoder(enc)()
2547 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2548 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002549 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2551 self.check_newline_decoding_utf8(decoder)
2552
Antoine Pitrou66913e22009-03-06 23:40:56 +00002553 def test_newline_bytes(self):
2554 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2555 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002556 self.assertEqual(dec.newlines, None)
2557 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2558 self.assertEqual(dec.newlines, None)
2559 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2560 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002561 dec = self.IncrementalNewlineDecoder(None, translate=False)
2562 _check(dec)
2563 dec = self.IncrementalNewlineDecoder(None, translate=True)
2564 _check(dec)
2565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002566class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2567 pass
2568
2569class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2570 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002571
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002572
Guido van Rossum01a27522007-03-07 01:00:12 +00002573# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002574
Guido van Rossum5abbf752007-08-27 17:39:33 +00002575class MiscIOTest(unittest.TestCase):
2576
Barry Warsaw40e82462008-11-20 20:14:50 +00002577 def tearDown(self):
2578 support.unlink(support.TESTFN)
2579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002580 def test___all__(self):
2581 for name in self.io.__all__:
2582 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002583 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002584 if name == "open":
2585 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002586 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002587 self.assertTrue(issubclass(obj, Exception), name)
2588 elif not name.startswith("SEEK_"):
2589 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002590
Barry Warsaw40e82462008-11-20 20:14:50 +00002591 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002592 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002593 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002594 f.close()
2595
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002596 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002597 self.assertEqual(f.name, support.TESTFN)
2598 self.assertEqual(f.buffer.name, support.TESTFN)
2599 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2600 self.assertEqual(f.mode, "U")
2601 self.assertEqual(f.buffer.mode, "rb")
2602 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002603 f.close()
2604
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002605 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002606 self.assertEqual(f.mode, "w+")
2607 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2608 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002610 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002611 self.assertEqual(g.mode, "wb")
2612 self.assertEqual(g.raw.mode, "wb")
2613 self.assertEqual(g.name, f.fileno())
2614 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002615 f.close()
2616 g.close()
2617
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002618 def test_io_after_close(self):
2619 for kwargs in [
2620 {"mode": "w"},
2621 {"mode": "wb"},
2622 {"mode": "w", "buffering": 1},
2623 {"mode": "w", "buffering": 2},
2624 {"mode": "wb", "buffering": 0},
2625 {"mode": "r"},
2626 {"mode": "rb"},
2627 {"mode": "r", "buffering": 1},
2628 {"mode": "r", "buffering": 2},
2629 {"mode": "rb", "buffering": 0},
2630 {"mode": "w+"},
2631 {"mode": "w+b"},
2632 {"mode": "w+", "buffering": 1},
2633 {"mode": "w+", "buffering": 2},
2634 {"mode": "w+b", "buffering": 0},
2635 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002637 f.close()
2638 self.assertRaises(ValueError, f.flush)
2639 self.assertRaises(ValueError, f.fileno)
2640 self.assertRaises(ValueError, f.isatty)
2641 self.assertRaises(ValueError, f.__iter__)
2642 if hasattr(f, "peek"):
2643 self.assertRaises(ValueError, f.peek, 1)
2644 self.assertRaises(ValueError, f.read)
2645 if hasattr(f, "read1"):
2646 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002647 if hasattr(f, "readall"):
2648 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002649 if hasattr(f, "readinto"):
2650 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2651 self.assertRaises(ValueError, f.readline)
2652 self.assertRaises(ValueError, f.readlines)
2653 self.assertRaises(ValueError, f.seek, 0)
2654 self.assertRaises(ValueError, f.tell)
2655 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656 self.assertRaises(ValueError, f.write,
2657 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002658 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002659 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002661 def test_blockingioerror(self):
2662 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002663 class C(str):
2664 pass
2665 c = C("")
2666 b = self.BlockingIOError(1, c)
2667 c.b = b
2668 b.c = c
2669 wr = weakref.ref(c)
2670 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002671 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002672 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002673
2674 def test_abcs(self):
2675 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002676 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2677 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2678 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2679 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002680
2681 def _check_abc_inheritance(self, abcmodule):
2682 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002683 self.assertIsInstance(f, abcmodule.IOBase)
2684 self.assertIsInstance(f, abcmodule.RawIOBase)
2685 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2686 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002687 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002688 self.assertIsInstance(f, abcmodule.IOBase)
2689 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2690 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2691 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002692 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002693 self.assertIsInstance(f, abcmodule.IOBase)
2694 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2695 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2696 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002697
2698 def test_abc_inheritance(self):
2699 # Test implementations inherit from their respective ABCs
2700 self._check_abc_inheritance(self)
2701
2702 def test_abc_inheritance_official(self):
2703 # Test implementations inherit from the official ABCs of the
2704 # baseline "io" module.
2705 self._check_abc_inheritance(io)
2706
Antoine Pitroue033e062010-10-29 10:38:18 +00002707 def _check_warn_on_dealloc(self, *args, **kwargs):
2708 f = open(*args, **kwargs)
2709 r = repr(f)
2710 with self.assertWarns(ResourceWarning) as cm:
2711 f = None
2712 support.gc_collect()
2713 self.assertIn(r, str(cm.warning.args[0]))
2714
2715 def test_warn_on_dealloc(self):
2716 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2717 self._check_warn_on_dealloc(support.TESTFN, "wb")
2718 self._check_warn_on_dealloc(support.TESTFN, "w")
2719
2720 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2721 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002722 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002723 for fd in fds:
2724 try:
2725 os.close(fd)
2726 except EnvironmentError as e:
2727 if e.errno != errno.EBADF:
2728 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002729 self.addCleanup(cleanup_fds)
2730 r, w = os.pipe()
2731 fds += r, w
2732 self._check_warn_on_dealloc(r, *args, **kwargs)
2733 # When using closefd=False, there's no warning
2734 r, w = os.pipe()
2735 fds += r, w
2736 with warnings.catch_warnings(record=True) as recorded:
2737 open(r, *args, closefd=False, **kwargs)
2738 support.gc_collect()
2739 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002740
2741 def test_warn_on_dealloc_fd(self):
2742 self._check_warn_on_dealloc_fd("rb", buffering=0)
2743 self._check_warn_on_dealloc_fd("rb")
2744 self._check_warn_on_dealloc_fd("r")
2745
2746
Antoine Pitrou243757e2010-11-05 21:15:39 +00002747 def test_pickling(self):
2748 # Pickling file objects is forbidden
2749 for kwargs in [
2750 {"mode": "w"},
2751 {"mode": "wb"},
2752 {"mode": "wb", "buffering": 0},
2753 {"mode": "r"},
2754 {"mode": "rb"},
2755 {"mode": "rb", "buffering": 0},
2756 {"mode": "w+"},
2757 {"mode": "w+b"},
2758 {"mode": "w+b", "buffering": 0},
2759 ]:
2760 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2761 with self.open(support.TESTFN, **kwargs) as f:
2762 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2763
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002764 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2765 def test_nonblock_pipe_write_bigbuf(self):
2766 self._test_nonblock_pipe_write(16*1024)
2767
2768 @unittest.skipUnless(fcntl, 'fcntl required for this test')
2769 def test_nonblock_pipe_write_smallbuf(self):
2770 self._test_nonblock_pipe_write(1024)
2771
2772 def _set_non_blocking(self, fd):
2773 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2774 self.assertNotEqual(flags, -1)
2775 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2776 self.assertEqual(res, 0)
2777
2778 def _test_nonblock_pipe_write(self, bufsize):
2779 sent = []
2780 received = []
2781 r, w = os.pipe()
2782 self._set_non_blocking(r)
2783 self._set_non_blocking(w)
2784
2785 # To exercise all code paths in the C implementation we need
2786 # to play with buffer sizes. For instance, if we choose a
2787 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2788 # then we will never get a partial write of the buffer.
2789 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2790 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2791
2792 with rf, wf:
2793 for N in 9999, 73, 7574:
2794 try:
2795 i = 0
2796 while True:
2797 msg = bytes([i % 26 + 97]) * N
2798 sent.append(msg)
2799 wf.write(msg)
2800 i += 1
2801
2802 except self.BlockingIOError as e:
2803 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002804 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002805 sent[-1] = sent[-1][:e.characters_written]
2806 received.append(rf.read())
2807 msg = b'BLOCKED'
2808 wf.write(msg)
2809 sent.append(msg)
2810
2811 while True:
2812 try:
2813 wf.flush()
2814 break
2815 except self.BlockingIOError as e:
2816 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01002817 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01002818 self.assertEqual(e.characters_written, 0)
2819 received.append(rf.read())
2820
2821 received += iter(rf.read, None)
2822
2823 sent, received = b''.join(sent), b''.join(received)
2824 self.assertTrue(sent == received)
2825 self.assertTrue(wf.closed)
2826 self.assertTrue(rf.closed)
2827
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828class CMiscIOTest(MiscIOTest):
2829 io = io
2830
2831class PyMiscIOTest(MiscIOTest):
2832 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002833
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002834
2835@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2836class SignalsTest(unittest.TestCase):
2837
2838 def setUp(self):
2839 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2840
2841 def tearDown(self):
2842 signal.signal(signal.SIGALRM, self.oldalrm)
2843
2844 def alarm_interrupt(self, sig, frame):
2845 1/0
2846
2847 @unittest.skipUnless(threading, 'Threading required for this test.')
2848 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2849 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002850 invokes the signal handler, and bubbles up the exception raised
2851 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002852 read_results = []
2853 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002854 if hasattr(signal, 'pthread_sigmask'):
2855 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002856 s = os.read(r, 1)
2857 read_results.append(s)
2858 t = threading.Thread(target=_read)
2859 t.daemon = True
2860 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002861 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002862 try:
2863 wio = self.io.open(w, **fdopen_kwargs)
2864 t.start()
2865 signal.alarm(1)
2866 # Fill the pipe enough that the write will be blocking.
2867 # It will be interrupted by the timer armed above. Since the
2868 # other thread has read one byte, the low-level write will
2869 # return with a successful (partial) result rather than an EINTR.
2870 # The buffered IO layer must check for pending signal
2871 # handlers, which in this case will invoke alarm_interrupt().
2872 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002873 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002874 t.join()
2875 # We got one byte, get another one and check that it isn't a
2876 # repeat of the first one.
2877 read_results.append(os.read(r, 1))
2878 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2879 finally:
2880 os.close(w)
2881 os.close(r)
2882 # This is deliberate. If we didn't close the file descriptor
2883 # before closing wio, wio would try to flush its internal
2884 # buffer, and block again.
2885 try:
2886 wio.close()
2887 except IOError as e:
2888 if e.errno != errno.EBADF:
2889 raise
2890
2891 def test_interrupted_write_unbuffered(self):
2892 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2893
2894 def test_interrupted_write_buffered(self):
2895 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2896
2897 def test_interrupted_write_text(self):
2898 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2899
Brett Cannon31f59292011-02-21 19:29:56 +00002900 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002901 def check_reentrant_write(self, data, **fdopen_kwargs):
2902 def on_alarm(*args):
2903 # Will be called reentrantly from the same thread
2904 wio.write(data)
2905 1/0
2906 signal.signal(signal.SIGALRM, on_alarm)
2907 r, w = os.pipe()
2908 wio = self.io.open(w, **fdopen_kwargs)
2909 try:
2910 signal.alarm(1)
2911 # Either the reentrant call to wio.write() fails with RuntimeError,
2912 # or the signal handler raises ZeroDivisionError.
2913 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2914 while 1:
2915 for i in range(100):
2916 wio.write(data)
2917 wio.flush()
2918 # Make sure the buffer doesn't fill up and block further writes
2919 os.read(r, len(data) * 100)
2920 exc = cm.exception
2921 if isinstance(exc, RuntimeError):
2922 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2923 finally:
2924 wio.close()
2925 os.close(r)
2926
2927 def test_reentrant_write_buffered(self):
2928 self.check_reentrant_write(b"xy", mode="wb")
2929
2930 def test_reentrant_write_text(self):
2931 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2932
Antoine Pitrou707ce822011-02-25 21:24:11 +00002933 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2934 """Check that a buffered read, when it gets interrupted (either
2935 returning a partial result or EINTR), properly invokes the signal
2936 handler and retries if the latter returned successfully."""
2937 r, w = os.pipe()
2938 fdopen_kwargs["closefd"] = False
2939 def alarm_handler(sig, frame):
2940 os.write(w, b"bar")
2941 signal.signal(signal.SIGALRM, alarm_handler)
2942 try:
2943 rio = self.io.open(r, **fdopen_kwargs)
2944 os.write(w, b"foo")
2945 signal.alarm(1)
2946 # Expected behaviour:
2947 # - first raw read() returns partial b"foo"
2948 # - second raw read() returns EINTR
2949 # - third raw read() returns b"bar"
2950 self.assertEqual(decode(rio.read(6)), "foobar")
2951 finally:
2952 rio.close()
2953 os.close(w)
2954 os.close(r)
2955
Antoine Pitrou20db5112011-08-19 20:32:34 +02002956 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002957 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2958 mode="rb")
2959
Antoine Pitrou20db5112011-08-19 20:32:34 +02002960 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002961 self.check_interrupted_read_retry(lambda x: x,
2962 mode="r")
2963
2964 @unittest.skipUnless(threading, 'Threading required for this test.')
2965 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2966 """Check that a buffered write, when it gets interrupted (either
2967 returning a partial result or EINTR), properly invokes the signal
2968 handler and retries if the latter returned successfully."""
2969 select = support.import_module("select")
2970 # A quantity that exceeds the buffer size of an anonymous pipe's
2971 # write end.
2972 N = 1024 * 1024
2973 r, w = os.pipe()
2974 fdopen_kwargs["closefd"] = False
2975 # We need a separate thread to read from the pipe and allow the
2976 # write() to finish. This thread is started after the SIGALRM is
2977 # received (forcing a first EINTR in write()).
2978 read_results = []
2979 write_finished = False
2980 def _read():
2981 while not write_finished:
2982 while r in select.select([r], [], [], 1.0)[0]:
2983 s = os.read(r, 1024)
2984 read_results.append(s)
2985 t = threading.Thread(target=_read)
2986 t.daemon = True
2987 def alarm1(sig, frame):
2988 signal.signal(signal.SIGALRM, alarm2)
2989 signal.alarm(1)
2990 def alarm2(sig, frame):
2991 t.start()
2992 signal.signal(signal.SIGALRM, alarm1)
2993 try:
2994 wio = self.io.open(w, **fdopen_kwargs)
2995 signal.alarm(1)
2996 # Expected behaviour:
2997 # - first raw write() is partial (because of the limited pipe buffer
2998 # and the first alarm)
2999 # - second raw write() returns EINTR (because of the second alarm)
3000 # - subsequent write()s are successful (either partial or complete)
3001 self.assertEqual(N, wio.write(item * N))
3002 wio.flush()
3003 write_finished = True
3004 t.join()
3005 self.assertEqual(N, sum(len(x) for x in read_results))
3006 finally:
3007 write_finished = True
3008 os.close(w)
3009 os.close(r)
3010 # This is deliberate. If we didn't close the file descriptor
3011 # before closing wio, wio would try to flush its internal
3012 # buffer, and could block (in case of failure).
3013 try:
3014 wio.close()
3015 except IOError as e:
3016 if e.errno != errno.EBADF:
3017 raise
3018
Antoine Pitrou20db5112011-08-19 20:32:34 +02003019 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003020 self.check_interrupted_write_retry(b"x", mode="wb")
3021
Antoine Pitrou20db5112011-08-19 20:32:34 +02003022 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00003023 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3024
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003025
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003026class CSignalsTest(SignalsTest):
3027 io = io
3028
3029class PySignalsTest(SignalsTest):
3030 io = pyio
3031
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00003032 # Handling reentrancy issues would slow down _pyio even more, so the
3033 # tests are disabled.
3034 test_reentrant_write_buffered = None
3035 test_reentrant_write_text = None
3036
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003037
Guido van Rossum28524c72007-02-27 05:47:44 +00003038def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003039 tests = (CIOTest, PyIOTest,
3040 CBufferedReaderTest, PyBufferedReaderTest,
3041 CBufferedWriterTest, PyBufferedWriterTest,
3042 CBufferedRWPairTest, PyBufferedRWPairTest,
3043 CBufferedRandomTest, PyBufferedRandomTest,
3044 StatefulIncrementalDecoderTest,
3045 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3046 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00003047 CMiscIOTest, PyMiscIOTest,
3048 CSignalsTest, PySignalsTest,
3049 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003050
3051 # Put the namespaces of the IO module we are testing and some useful mock
3052 # classes in the __dict__ of each test.
3053 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00003054 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003055 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3056 c_io_ns = {name : getattr(io, name) for name in all_members}
3057 py_io_ns = {name : getattr(pyio, name) for name in all_members}
3058 globs = globals()
3059 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3060 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3061 # Avoid turning open into a bound method.
3062 py_io_ns["open"] = pyio.OpenWrapper
3063 for test in tests:
3064 if test.__name__.startswith("C"):
3065 for name, obj in c_io_ns.items():
3066 setattr(test, name, obj)
3067 elif test.__name__.startswith("Py"):
3068 for name, obj in py_io_ns.items():
3069 setattr(test, name, obj)
3070
3071 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003072
3073if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003074 test_main()