blob: a9e3c37c719f2338e842681ecbbac06766bf8888 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000049 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400613 def test_types_have_dict(self):
614 test = (
615 self.IOBase(),
616 self.RawIOBase(),
617 self.TextIOBase(),
618 self.StringIO(),
619 self.BytesIO()
620 )
621 for obj in test:
622 self.assertTrue(hasattr(obj, "__dict__"))
623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200625
626 def test_IOBase_finalize(self):
627 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
628 # class which inherits IOBase and an object of this class are caught
629 # in a reference cycle and close() is already in the method cache.
630 class MyIO(self.IOBase):
631 def close(self):
632 pass
633
634 # create an instance to populate the method cache
635 MyIO()
636 obj = MyIO()
637 obj.obj = obj
638 wr = weakref.ref(obj)
639 del MyIO
640 del obj
641 support.gc_collect()
642 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000644class PyIOTest(IOTest):
645 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000646
Guido van Rossuma9e20242007-03-08 00:43:48 +0000647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 def test_fileno(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662
663 def test_no_fileno(self):
664 # XXX will we always have fileno() function? If so, kill
665 # this test. Else, write it.
666 pass
667
668 def test_invalid_args(self):
669 rawio = self.MockRawIO()
670 bufio = self.tp(rawio)
671 # Invalid whence
672 self.assertRaises(ValueError, bufio.seek, 0, -1)
673 self.assertRaises(ValueError, bufio.seek, 0, 3)
674
675 def test_override_destructor(self):
676 tp = self.tp
677 record = []
678 class MyBufferedIO(tp):
679 def __del__(self):
680 record.append(1)
681 try:
682 f = super().__del__
683 except AttributeError:
684 pass
685 else:
686 f()
687 def close(self):
688 record.append(2)
689 super().close()
690 def flush(self):
691 record.append(3)
692 super().flush()
693 rawio = self.MockRawIO()
694 bufio = MyBufferedIO(rawio)
695 writable = bufio.writable()
696 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 if writable:
699 self.assertEqual(record, [1, 2, 3])
700 else:
701 self.assertEqual(record, [1, 2])
702
703 def test_context_manager(self):
704 # Test usability as a context manager
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 def _with():
708 with bufio:
709 pass
710 _with()
711 # bufio should now be closed, and using it a second time should raise
712 # a ValueError.
713 self.assertRaises(ValueError, _with)
714
715 def test_error_through_destructor(self):
716 # Test that the exception state is not modified by a destructor,
717 # even if close() fails.
718 rawio = self.CloseFailureIO()
719 def f():
720 self.tp(rawio).xyzzy
721 with support.captured_output("stderr") as s:
722 self.assertRaises(AttributeError, f)
723 s = s.getvalue().strip()
724 if s:
725 # The destructor *may* have printed an unraisable error, check it
726 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000727 self.assertTrue(s.startswith("Exception IOError: "), s)
728 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000729
Antoine Pitrou716c4442009-05-23 19:04:03 +0000730 def test_repr(self):
731 raw = self.MockRawIO()
732 b = self.tp(raw)
733 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
734 self.assertEqual(repr(b), "<%s>" % clsname)
735 raw.name = "dummy"
736 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
737 raw.name = b"dummy"
738 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
739
Antoine Pitrou6be88762010-05-03 16:48:20 +0000740 def test_flush_error_on_close(self):
741 raw = self.MockRawIO()
742 def bad_flush():
743 raise IOError()
744 raw.flush = bad_flush
745 b = self.tp(raw)
746 self.assertRaises(IOError, b.close) # exception not swallowed
747
748 def test_multi_close(self):
749 raw = self.MockRawIO()
750 b = self.tp(raw)
751 b.close()
752 b.close()
753 b.close()
754 self.assertRaises(ValueError, b.flush)
755
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000756 def test_unseekable(self):
757 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
758 self.assertRaises(self.UnsupportedOperation, bufio.tell)
759 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
760
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises(AttributeError):
766 buf.raw = x
767
Guido van Rossum78892e42007-04-06 17:31:18 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
770 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000772 def test_constructor(self):
773 rawio = self.MockRawIO([b"abc"])
774 bufio = self.tp(rawio)
775 bufio.__init__(rawio)
776 bufio.__init__(rawio, buffer_size=1024)
777 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000778 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
780 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
781 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
782 rawio = self.MockRawIO([b"abc"])
783 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000784 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000787 for arg in (None, 7):
788 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
789 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000790 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 # Invalid args
792 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read1(self):
795 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000797 self.assertEqual(b"a", bufio.read(1))
798 self.assertEqual(b"b", bufio.read1(1))
799 self.assertEqual(rawio._reads, 1)
800 self.assertEqual(b"c", bufio.read1(100))
801 self.assertEqual(rawio._reads, 1)
802 self.assertEqual(b"d", bufio.read1(100))
803 self.assertEqual(rawio._reads, 2)
804 self.assertEqual(b"efg", bufio.read1(100))
805 self.assertEqual(rawio._reads, 3)
806 self.assertEqual(b"", bufio.read1(100))
807 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 # Invalid args
809 self.assertRaises(ValueError, bufio.read1, -1)
810
811 def test_readinto(self):
812 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
813 bufio = self.tp(rawio)
814 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000815 self.assertEqual(bufio.readinto(b), 2)
816 self.assertEqual(b, b"ab")
817 self.assertEqual(bufio.readinto(b), 2)
818 self.assertEqual(b, b"cd")
819 self.assertEqual(bufio.readinto(b), 2)
820 self.assertEqual(b, b"ef")
821 self.assertEqual(bufio.readinto(b), 1)
822 self.assertEqual(b, b"gf")
823 self.assertEqual(bufio.readinto(b), 0)
824 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200825 rawio = self.MockRawIO((b"abc", None))
826 bufio = self.tp(rawio)
827 self.assertEqual(bufio.readinto(b), 2)
828 self.assertEqual(b, b"ab")
829 self.assertEqual(bufio.readinto(b), 1)
830 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000831
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000832 def test_readlines(self):
833 def bufio():
834 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
835 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000836 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
837 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
838 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000841 data = b"abcdefghi"
842 dlen = len(data)
843
844 tests = [
845 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
846 [ 100, [ 3, 3, 3], [ dlen ] ],
847 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
848 ]
849
850 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 rawio = self.MockFileIO(data)
852 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000853 pos = 0
854 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000855 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000856 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000857 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000860 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000861 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
863 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000864 self.assertEqual(b"abcd", bufio.read(6))
865 self.assertEqual(b"e", bufio.read(1))
866 self.assertEqual(b"fg", bufio.read())
867 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200868 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000869 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000870
Victor Stinnera80987f2011-05-25 22:47:16 +0200871 rawio = self.MockRawIO((b"a", None, None))
872 self.assertEqual(b"a", rawio.readall())
873 self.assertIsNone(rawio.readall())
874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_read_past_eof(self):
876 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
877 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000878
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000880
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000881 def test_read_all(self):
882 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
883 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000884
Ezio Melottib3aedd42010-11-20 19:04:17 +0000885 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000886
Victor Stinner45df8202010-04-28 22:31:17 +0000887 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000888 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000890 try:
891 # Write out many bytes with exactly the same number of 0's,
892 # 1's... 255's. This will help us check that concurrent reading
893 # doesn't duplicate or forget contents.
894 N = 1000
895 l = list(range(256)) * N
896 random.shuffle(l)
897 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000898 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000899 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000900 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000902 errors = []
903 results = []
904 def f():
905 try:
906 # Intra-buffer read then buffer-flushing read
907 for n in cycle([1, 19]):
908 s = bufio.read(n)
909 if not s:
910 break
911 # list.append() is atomic
912 results.append(s)
913 except Exception as e:
914 errors.append(e)
915 raise
916 threads = [threading.Thread(target=f) for x in range(20)]
917 for t in threads:
918 t.start()
919 time.sleep(0.02) # yield
920 for t in threads:
921 t.join()
922 self.assertFalse(errors,
923 "the following exceptions were caught: %r" % errors)
924 s = b''.join(results)
925 for i in range(256):
926 c = bytes(bytearray([i]))
927 self.assertEqual(s.count(c), N)
928 finally:
929 support.unlink(support.TESTFN)
930
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200931 def test_unseekable(self):
932 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
933 self.assertRaises(self.UnsupportedOperation, bufio.tell)
934 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
935 bufio.read(1)
936 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
937 self.assertRaises(self.UnsupportedOperation, bufio.tell)
938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def test_misbehaved_io(self):
940 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
941 bufio = self.tp(rawio)
942 self.assertRaises(IOError, bufio.seek, 0)
943 self.assertRaises(IOError, bufio.tell)
944
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000945 def test_no_extraneous_read(self):
946 # Issue #9550; when the raw IO object has satisfied the read request,
947 # we should not issue any additional reads, otherwise it may block
948 # (e.g. socket).
949 bufsize = 16
950 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
951 rawio = self.MockRawIO([b"x" * n])
952 bufio = self.tp(rawio, bufsize)
953 self.assertEqual(bufio.read(n), b"x" * n)
954 # Simple case: one raw read is enough to satisfy the request.
955 self.assertEqual(rawio._extraneous_reads, 0,
956 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
957 # A more complex case where two raw reads are needed to satisfy
958 # the request.
959 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
960 bufio = self.tp(rawio, bufsize)
961 self.assertEqual(bufio.read(n), b"x" * n)
962 self.assertEqual(rawio._extraneous_reads, 0,
963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
964
965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000966class CBufferedReaderTest(BufferedReaderTest):
967 tp = io.BufferedReader
968
969 def test_constructor(self):
970 BufferedReaderTest.test_constructor(self)
971 # The allocation can succeed on 32-bit builds, e.g. with more
972 # than 2GB RAM and a 64-bit kernel.
973 if sys.maxsize > 0x7FFFFFFF:
974 rawio = self.MockRawIO()
975 bufio = self.tp(rawio)
976 self.assertRaises((OverflowError, MemoryError, ValueError),
977 bufio.__init__, rawio, sys.maxsize)
978
979 def test_initialization(self):
980 rawio = self.MockRawIO([b"abc"])
981 bufio = self.tp(rawio)
982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
983 self.assertRaises(ValueError, bufio.read)
984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
985 self.assertRaises(ValueError, bufio.read)
986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
987 self.assertRaises(ValueError, bufio.read)
988
989 def test_misbehaved_io_read(self):
990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
991 bufio = self.tp(rawio)
992 # _pyio.BufferedReader seems to implement reading different, so that
993 # checking this is not so easy.
994 self.assertRaises(IOError, bufio.read, 10)
995
996 def test_garbage_collection(self):
997 # C BufferedReader objects are collected.
998 # The Python version has __del__, so it ends into gc.garbage instead
999 rawio = self.FileIO(support.TESTFN, "w+b")
1000 f = self.tp(rawio)
1001 f.f = f
1002 wr = weakref.ref(f)
1003 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001004 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001005 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006
1007class PyBufferedReaderTest(BufferedReaderTest):
1008 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001009
Guido van Rossuma9e20242007-03-08 00:43:48 +00001010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1012 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001013
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001014 def test_constructor(self):
1015 rawio = self.MockRawIO()
1016 bufio = self.tp(rawio)
1017 bufio.__init__(rawio)
1018 bufio.__init__(rawio, buffer_size=1024)
1019 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001020 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 bufio.flush()
1022 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1023 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1024 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1025 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001026 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001027 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001028 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001030 def test_detach_flush(self):
1031 raw = self.MockRawIO()
1032 buf = self.tp(raw)
1033 buf.write(b"howdy!")
1034 self.assertFalse(raw._write_stack)
1035 buf.detach()
1036 self.assertEqual(raw._write_stack, [b"howdy!"])
1037
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001038 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001039 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 writer = self.MockRawIO()
1041 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001042 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001043 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001044
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001045 def test_write_overflow(self):
1046 writer = self.MockRawIO()
1047 bufio = self.tp(writer, 8)
1048 contents = b"abcdefghijklmnop"
1049 for n in range(0, len(contents), 3):
1050 bufio.write(contents[n:n+3])
1051 flushed = b"".join(writer._write_stack)
1052 # At least (total - 8) bytes were implicitly flushed, perhaps more
1053 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001054 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056 def check_writes(self, intermediate_func):
1057 # Lots of writes, test the flushed output is as expected.
1058 contents = bytes(range(256)) * 1000
1059 n = 0
1060 writer = self.MockRawIO()
1061 bufio = self.tp(writer, 13)
1062 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1063 def gen_sizes():
1064 for size in count(1):
1065 for i in range(15):
1066 yield size
1067 sizes = gen_sizes()
1068 while n < len(contents):
1069 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001070 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 intermediate_func(bufio)
1072 n += size
1073 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001074 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 def test_writes(self):
1077 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001078
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 def test_writes_and_flushes(self):
1080 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_writes_and_seeks(self):
1083 def _seekabs(bufio):
1084 pos = bufio.tell()
1085 bufio.seek(pos + 1, 0)
1086 bufio.seek(pos - 1, 0)
1087 bufio.seek(pos, 0)
1088 self.check_writes(_seekabs)
1089 def _seekrel(bufio):
1090 pos = bufio.seek(0, 1)
1091 bufio.seek(+1, 1)
1092 bufio.seek(-1, 1)
1093 bufio.seek(pos, 0)
1094 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001095
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 def test_writes_and_truncates(self):
1097 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001098
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 def test_write_non_blocking(self):
1100 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001101 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001102
Ezio Melottib3aedd42010-11-20 19:04:17 +00001103 self.assertEqual(bufio.write(b"abcd"), 4)
1104 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 # 1 byte will be written, the rest will be buffered
1106 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001107 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001108
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001109 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1110 raw.block_on(b"0")
1111 try:
1112 bufio.write(b"opqrwxyz0123456789")
1113 except self.BlockingIOError as e:
1114 written = e.characters_written
1115 else:
1116 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001117 self.assertEqual(written, 16)
1118 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001119 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001120
Ezio Melottib3aedd42010-11-20 19:04:17 +00001121 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001122 s = raw.pop_written()
1123 # Previously buffered bytes were flushed
1124 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 def test_write_and_rewind(self):
1127 raw = io.BytesIO()
1128 bufio = self.tp(raw, 4)
1129 self.assertEqual(bufio.write(b"abcdef"), 6)
1130 self.assertEqual(bufio.tell(), 6)
1131 bufio.seek(0, 0)
1132 self.assertEqual(bufio.write(b"XY"), 2)
1133 bufio.seek(6, 0)
1134 self.assertEqual(raw.getvalue(), b"XYcdef")
1135 self.assertEqual(bufio.write(b"123456"), 6)
1136 bufio.flush()
1137 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139 def test_flush(self):
1140 writer = self.MockRawIO()
1141 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001142 bufio.write(b"abc")
1143 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001144 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001146 def test_destructor(self):
1147 writer = self.MockRawIO()
1148 bufio = self.tp(writer, 8)
1149 bufio.write(b"abc")
1150 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001151 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001152 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001153
1154 def test_truncate(self):
1155 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001156 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001157 bufio = self.tp(raw, 8)
1158 bufio.write(b"abcdef")
1159 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001160 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001161 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 self.assertEqual(f.read(), b"abc")
1163
Victor Stinner45df8202010-04-28 22:31:17 +00001164 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001165 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001167 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001168 # Write out many bytes from many threads and test they were
1169 # all flushed.
1170 N = 1000
1171 contents = bytes(range(256)) * N
1172 sizes = cycle([1, 19])
1173 n = 0
1174 queue = deque()
1175 while n < len(contents):
1176 size = next(sizes)
1177 queue.append(contents[n:n+size])
1178 n += size
1179 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001180 # We use a real file object because it allows us to
1181 # exercise situations where the GIL is released before
1182 # writing the buffer to the raw streams. This is in addition
1183 # to concurrency issues due to switching threads in the middle
1184 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001185 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001187 errors = []
1188 def f():
1189 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001190 while True:
1191 try:
1192 s = queue.popleft()
1193 except IndexError:
1194 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001195 bufio.write(s)
1196 except Exception as e:
1197 errors.append(e)
1198 raise
1199 threads = [threading.Thread(target=f) for x in range(20)]
1200 for t in threads:
1201 t.start()
1202 time.sleep(0.02) # yield
1203 for t in threads:
1204 t.join()
1205 self.assertFalse(errors,
1206 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001208 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 s = f.read()
1210 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001211 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001212 finally:
1213 support.unlink(support.TESTFN)
1214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001215 def test_misbehaved_io(self):
1216 rawio = self.MisbehavedRawIO()
1217 bufio = self.tp(rawio, 5)
1218 self.assertRaises(IOError, bufio.seek, 0)
1219 self.assertRaises(IOError, bufio.tell)
1220 self.assertRaises(IOError, bufio.write, b"abcdef")
1221
Benjamin Peterson59406a92009-03-26 17:10:29 +00001222 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001223 with support.check_warnings(("max_buffer_size is deprecated",
1224 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001225 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001226
1227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228class CBufferedWriterTest(BufferedWriterTest):
1229 tp = io.BufferedWriter
1230
1231 def test_constructor(self):
1232 BufferedWriterTest.test_constructor(self)
1233 # The allocation can succeed on 32-bit builds, e.g. with more
1234 # than 2GB RAM and a 64-bit kernel.
1235 if sys.maxsize > 0x7FFFFFFF:
1236 rawio = self.MockRawIO()
1237 bufio = self.tp(rawio)
1238 self.assertRaises((OverflowError, MemoryError, ValueError),
1239 bufio.__init__, rawio, sys.maxsize)
1240
1241 def test_initialization(self):
1242 rawio = self.MockRawIO()
1243 bufio = self.tp(rawio)
1244 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1245 self.assertRaises(ValueError, bufio.write, b"def")
1246 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1247 self.assertRaises(ValueError, bufio.write, b"def")
1248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1249 self.assertRaises(ValueError, bufio.write, b"def")
1250
1251 def test_garbage_collection(self):
1252 # C BufferedWriter objects are collected, and collecting them flushes
1253 # all data to disk.
1254 # The Python version has __del__, so it ends into gc.garbage instead
1255 rawio = self.FileIO(support.TESTFN, "w+b")
1256 f = self.tp(rawio)
1257 f.write(b"123xxx")
1258 f.x = f
1259 wr = weakref.ref(f)
1260 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001261 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001262 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001263 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 self.assertEqual(f.read(), b"123xxx")
1265
1266
1267class PyBufferedWriterTest(BufferedWriterTest):
1268 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001269
Guido van Rossum01a27522007-03-07 01:00:12 +00001270class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001271
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001272 def test_constructor(self):
1273 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001274 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001275
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001276 def test_detach(self):
1277 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1278 self.assertRaises(self.UnsupportedOperation, pair.detach)
1279
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001280 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001281 with support.check_warnings(("max_buffer_size is deprecated",
1282 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001283 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001284
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001285 def test_constructor_with_not_readable(self):
1286 class NotReadable(MockRawIO):
1287 def readable(self):
1288 return False
1289
1290 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1291
1292 def test_constructor_with_not_writeable(self):
1293 class NotWriteable(MockRawIO):
1294 def writable(self):
1295 return False
1296
1297 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1298
1299 def test_read(self):
1300 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1301
1302 self.assertEqual(pair.read(3), b"abc")
1303 self.assertEqual(pair.read(1), b"d")
1304 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001305 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1306 self.assertEqual(pair.read(None), b"abc")
1307
1308 def test_readlines(self):
1309 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1310 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1311 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1312 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001313
1314 def test_read1(self):
1315 # .read1() is delegated to the underlying reader object, so this test
1316 # can be shallow.
1317 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1318
1319 self.assertEqual(pair.read1(3), b"abc")
1320
1321 def test_readinto(self):
1322 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1323
1324 data = bytearray(5)
1325 self.assertEqual(pair.readinto(data), 5)
1326 self.assertEqual(data, b"abcde")
1327
1328 def test_write(self):
1329 w = self.MockRawIO()
1330 pair = self.tp(self.MockRawIO(), w)
1331
1332 pair.write(b"abc")
1333 pair.flush()
1334 pair.write(b"def")
1335 pair.flush()
1336 self.assertEqual(w._write_stack, [b"abc", b"def"])
1337
1338 def test_peek(self):
1339 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1340
1341 self.assertTrue(pair.peek(3).startswith(b"abc"))
1342 self.assertEqual(pair.read(3), b"abc")
1343
1344 def test_readable(self):
1345 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1346 self.assertTrue(pair.readable())
1347
1348 def test_writeable(self):
1349 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1350 self.assertTrue(pair.writable())
1351
1352 def test_seekable(self):
1353 # BufferedRWPairs are never seekable, even if their readers and writers
1354 # are.
1355 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1356 self.assertFalse(pair.seekable())
1357
1358 # .flush() is delegated to the underlying writer object and has been
1359 # tested in the test_write method.
1360
1361 def test_close_and_closed(self):
1362 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1363 self.assertFalse(pair.closed)
1364 pair.close()
1365 self.assertTrue(pair.closed)
1366
1367 def test_isatty(self):
1368 class SelectableIsAtty(MockRawIO):
1369 def __init__(self, isatty):
1370 MockRawIO.__init__(self)
1371 self._isatty = isatty
1372
1373 def isatty(self):
1374 return self._isatty
1375
1376 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1377 self.assertFalse(pair.isatty())
1378
1379 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1380 self.assertTrue(pair.isatty())
1381
1382 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1383 self.assertTrue(pair.isatty())
1384
1385 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1386 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388class CBufferedRWPairTest(BufferedRWPairTest):
1389 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001391class PyBufferedRWPairTest(BufferedRWPairTest):
1392 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394
1395class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1396 read_mode = "rb+"
1397 write_mode = "wb+"
1398
1399 def test_constructor(self):
1400 BufferedReaderTest.test_constructor(self)
1401 BufferedWriterTest.test_constructor(self)
1402
1403 def test_read_and_write(self):
1404 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001405 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001406
1407 self.assertEqual(b"as", rw.read(2))
1408 rw.write(b"ddd")
1409 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001410 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001413
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 def test_seek_and_tell(self):
1415 raw = self.BytesIO(b"asdfghjkl")
1416 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001417
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(b"as", rw.read(2))
1419 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001420 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Antoine Pitroue05565e2011-08-20 14:39:23 +02001423 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001424 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001425 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001426 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001427 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001429 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001430 self.assertEqual(7, rw.tell())
1431 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001432 rw.flush()
1433 self.assertEqual(b"asdf123fl", raw.getvalue())
1434
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001435 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001436
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001437 def check_flush_and_read(self, read_func):
1438 raw = self.BytesIO(b"abcdefghi")
1439 bufio = self.tp(raw)
1440
Ezio Melottib3aedd42010-11-20 19:04:17 +00001441 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001442 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001443 self.assertEqual(b"ef", read_func(bufio, 2))
1444 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001446 self.assertEqual(6, bufio.tell())
1447 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 raw.seek(0, 0)
1449 raw.write(b"XYZ")
1450 # flush() resets the read buffer
1451 bufio.flush()
1452 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001453 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454
1455 def test_flush_and_read(self):
1456 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1457
1458 def test_flush_and_readinto(self):
1459 def _readinto(bufio, n=-1):
1460 b = bytearray(n if n >= 0 else 9999)
1461 n = bufio.readinto(b)
1462 return bytes(b[:n])
1463 self.check_flush_and_read(_readinto)
1464
1465 def test_flush_and_peek(self):
1466 def _peek(bufio, n=-1):
1467 # This relies on the fact that the buffer can contain the whole
1468 # raw stream, otherwise peek() can return less.
1469 b = bufio.peek(n)
1470 if n != -1:
1471 b = b[:n]
1472 bufio.seek(len(b), 1)
1473 return b
1474 self.check_flush_and_read(_peek)
1475
1476 def test_flush_and_write(self):
1477 raw = self.BytesIO(b"abcdefghi")
1478 bufio = self.tp(raw)
1479
1480 bufio.write(b"123")
1481 bufio.flush()
1482 bufio.write(b"45")
1483 bufio.flush()
1484 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001485 self.assertEqual(b"12345fghi", raw.getvalue())
1486 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487
1488 def test_threads(self):
1489 BufferedReaderTest.test_threads(self)
1490 BufferedWriterTest.test_threads(self)
1491
1492 def test_writes_and_peek(self):
1493 def _peek(bufio):
1494 bufio.peek(1)
1495 self.check_writes(_peek)
1496 def _peek(bufio):
1497 pos = bufio.tell()
1498 bufio.seek(-1, 1)
1499 bufio.peek(1)
1500 bufio.seek(pos, 0)
1501 self.check_writes(_peek)
1502
1503 def test_writes_and_reads(self):
1504 def _read(bufio):
1505 bufio.seek(-1, 1)
1506 bufio.read(1)
1507 self.check_writes(_read)
1508
1509 def test_writes_and_read1s(self):
1510 def _read1(bufio):
1511 bufio.seek(-1, 1)
1512 bufio.read1(1)
1513 self.check_writes(_read1)
1514
1515 def test_writes_and_readintos(self):
1516 def _read(bufio):
1517 bufio.seek(-1, 1)
1518 bufio.readinto(bytearray(1))
1519 self.check_writes(_read)
1520
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001521 def test_write_after_readahead(self):
1522 # Issue #6629: writing after the buffer was filled by readahead should
1523 # first rewind the raw stream.
1524 for overwrite_size in [1, 5]:
1525 raw = self.BytesIO(b"A" * 10)
1526 bufio = self.tp(raw, 4)
1527 # Trigger readahead
1528 self.assertEqual(bufio.read(1), b"A")
1529 self.assertEqual(bufio.tell(), 1)
1530 # Overwriting should rewind the raw stream if it needs so
1531 bufio.write(b"B" * overwrite_size)
1532 self.assertEqual(bufio.tell(), overwrite_size + 1)
1533 # If the write size was smaller than the buffer size, flush() and
1534 # check that rewind happens.
1535 bufio.flush()
1536 self.assertEqual(bufio.tell(), overwrite_size + 1)
1537 s = raw.getvalue()
1538 self.assertEqual(s,
1539 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1540
Antoine Pitrou7c404892011-05-13 00:13:33 +02001541 def test_write_rewind_write(self):
1542 # Various combinations of reading / writing / seeking backwards / writing again
1543 def mutate(bufio, pos1, pos2):
1544 assert pos2 >= pos1
1545 # Fill the buffer
1546 bufio.seek(pos1)
1547 bufio.read(pos2 - pos1)
1548 bufio.write(b'\x02')
1549 # This writes earlier than the previous write, but still inside
1550 # the buffer.
1551 bufio.seek(pos1)
1552 bufio.write(b'\x01')
1553
1554 b = b"\x80\x81\x82\x83\x84"
1555 for i in range(0, len(b)):
1556 for j in range(i, len(b)):
1557 raw = self.BytesIO(b)
1558 bufio = self.tp(raw, 100)
1559 mutate(bufio, i, j)
1560 bufio.flush()
1561 expected = bytearray(b)
1562 expected[j] = 2
1563 expected[i] = 1
1564 self.assertEqual(raw.getvalue(), expected,
1565 "failed result for i=%d, j=%d" % (i, j))
1566
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001567 def test_truncate_after_read_or_write(self):
1568 raw = self.BytesIO(b"A" * 10)
1569 bufio = self.tp(raw, 100)
1570 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1571 self.assertEqual(bufio.truncate(), 2)
1572 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1573 self.assertEqual(bufio.truncate(), 4)
1574
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 def test_misbehaved_io(self):
1576 BufferedReaderTest.test_misbehaved_io(self)
1577 BufferedWriterTest.test_misbehaved_io(self)
1578
Antoine Pitroue05565e2011-08-20 14:39:23 +02001579 def test_interleaved_read_write(self):
1580 # Test for issue #12213
1581 with self.BytesIO(b'abcdefgh') as raw:
1582 with self.tp(raw, 100) as f:
1583 f.write(b"1")
1584 self.assertEqual(f.read(1), b'b')
1585 f.write(b'2')
1586 self.assertEqual(f.read1(1), b'd')
1587 f.write(b'3')
1588 buf = bytearray(1)
1589 f.readinto(buf)
1590 self.assertEqual(buf, b'f')
1591 f.write(b'4')
1592 self.assertEqual(f.peek(1), b'h')
1593 f.flush()
1594 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1595
1596 with self.BytesIO(b'abc') as raw:
1597 with self.tp(raw, 100) as f:
1598 self.assertEqual(f.read(1), b'a')
1599 f.write(b"2")
1600 self.assertEqual(f.read(1), b'c')
1601 f.flush()
1602 self.assertEqual(raw.getvalue(), b'a2c')
1603
1604 def test_interleaved_readline_write(self):
1605 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1606 with self.tp(raw) as f:
1607 f.write(b'1')
1608 self.assertEqual(f.readline(), b'b\n')
1609 f.write(b'2')
1610 self.assertEqual(f.readline(), b'def\n')
1611 f.write(b'3')
1612 self.assertEqual(f.readline(), b'\n')
1613 f.flush()
1614 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1615
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001616 # You can't construct a BufferedRandom over a non-seekable stream.
1617 test_unseekable = None
1618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619class CBufferedRandomTest(BufferedRandomTest):
1620 tp = io.BufferedRandom
1621
1622 def test_constructor(self):
1623 BufferedRandomTest.test_constructor(self)
1624 # The allocation can succeed on 32-bit builds, e.g. with more
1625 # than 2GB RAM and a 64-bit kernel.
1626 if sys.maxsize > 0x7FFFFFFF:
1627 rawio = self.MockRawIO()
1628 bufio = self.tp(rawio)
1629 self.assertRaises((OverflowError, MemoryError, ValueError),
1630 bufio.__init__, rawio, sys.maxsize)
1631
1632 def test_garbage_collection(self):
1633 CBufferedReaderTest.test_garbage_collection(self)
1634 CBufferedWriterTest.test_garbage_collection(self)
1635
1636class PyBufferedRandomTest(BufferedRandomTest):
1637 tp = pyio.BufferedRandom
1638
1639
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001640# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1641# properties:
1642# - A single output character can correspond to many bytes of input.
1643# - The number of input bytes to complete the character can be
1644# undetermined until the last input byte is received.
1645# - The number of input bytes can vary depending on previous input.
1646# - A single input byte can correspond to many characters of output.
1647# - The number of output characters can be undetermined until the
1648# last input byte is received.
1649# - The number of output characters can vary depending on previous input.
1650
1651class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1652 """
1653 For testing seek/tell behavior with a stateful, buffering decoder.
1654
1655 Input is a sequence of words. Words may be fixed-length (length set
1656 by input) or variable-length (period-terminated). In variable-length
1657 mode, extra periods are ignored. Possible words are:
1658 - 'i' followed by a number sets the input length, I (maximum 99).
1659 When I is set to 0, words are space-terminated.
1660 - 'o' followed by a number sets the output length, O (maximum 99).
1661 - Any other word is converted into a word followed by a period on
1662 the output. The output word consists of the input word truncated
1663 or padded out with hyphens to make its length equal to O. If O
1664 is 0, the word is output verbatim without truncating or padding.
1665 I and O are initially set to 1. When I changes, any buffered input is
1666 re-scanned according to the new I. EOF also terminates the last word.
1667 """
1668
1669 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001670 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001671 self.reset()
1672
1673 def __repr__(self):
1674 return '<SID %x>' % id(self)
1675
1676 def reset(self):
1677 self.i = 1
1678 self.o = 1
1679 self.buffer = bytearray()
1680
1681 def getstate(self):
1682 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1683 return bytes(self.buffer), i*100 + o
1684
1685 def setstate(self, state):
1686 buffer, io = state
1687 self.buffer = bytearray(buffer)
1688 i, o = divmod(io, 100)
1689 self.i, self.o = i ^ 1, o ^ 1
1690
1691 def decode(self, input, final=False):
1692 output = ''
1693 for b in input:
1694 if self.i == 0: # variable-length, terminated with period
1695 if b == ord('.'):
1696 if self.buffer:
1697 output += self.process_word()
1698 else:
1699 self.buffer.append(b)
1700 else: # fixed-length, terminate after self.i bytes
1701 self.buffer.append(b)
1702 if len(self.buffer) == self.i:
1703 output += self.process_word()
1704 if final and self.buffer: # EOF terminates the last word
1705 output += self.process_word()
1706 return output
1707
1708 def process_word(self):
1709 output = ''
1710 if self.buffer[0] == ord('i'):
1711 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1712 elif self.buffer[0] == ord('o'):
1713 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1714 else:
1715 output = self.buffer.decode('ascii')
1716 if len(output) < self.o:
1717 output += '-'*self.o # pad out with hyphens
1718 if self.o:
1719 output = output[:self.o] # truncate to output length
1720 output += '.'
1721 self.buffer = bytearray()
1722 return output
1723
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001724 codecEnabled = False
1725
1726 @classmethod
1727 def lookupTestDecoder(cls, name):
1728 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001729 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001730 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001731 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001732 incrementalencoder=None,
1733 streamreader=None, streamwriter=None,
1734 incrementaldecoder=cls)
1735
1736# Register the previous decoder for testing.
1737# Disabled by default, tests will enable it.
1738codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1739
1740
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001741class StatefulIncrementalDecoderTest(unittest.TestCase):
1742 """
1743 Make sure the StatefulIncrementalDecoder actually works.
1744 """
1745
1746 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001747 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001748 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001749 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001750 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001751 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001752 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001753 # I=0, O=6 (variable-length input, fixed-length output)
1754 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1755 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001756 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001757 # I=6, O=3 (fixed-length input > fixed-length output)
1758 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1759 # I=0, then 3; O=29, then 15 (with longer output)
1760 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1761 'a----------------------------.' +
1762 'b----------------------------.' +
1763 'cde--------------------------.' +
1764 'abcdefghijabcde.' +
1765 'a.b------------.' +
1766 '.c.------------.' +
1767 'd.e------------.' +
1768 'k--------------.' +
1769 'l--------------.' +
1770 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001771 ]
1772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001773 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001774 # Try a few one-shot test cases.
1775 for input, eof, output in self.test_cases:
1776 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001777 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001778
1779 # Also test an unfinished decode, followed by forcing EOF.
1780 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001781 self.assertEqual(d.decode(b'oiabcd'), '')
1782 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001783
1784class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001785
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001786 def setUp(self):
1787 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1788 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001789 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001790
Guido van Rossumd0712812007-04-11 16:32:43 +00001791 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001792 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794 def test_constructor(self):
1795 r = self.BytesIO(b"\xc3\xa9\n\n")
1796 b = self.BufferedReader(r, 1000)
1797 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001798 t.__init__(b, encoding="latin-1", newline="\r\n")
1799 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001800 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001801 t.__init__(b, encoding="utf-8", line_buffering=True)
1802 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001803 self.assertEqual(t.line_buffering, True)
1804 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001805 self.assertRaises(TypeError, t.__init__, b, newline=42)
1806 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1807
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001808 def test_detach(self):
1809 r = self.BytesIO()
1810 b = self.BufferedWriter(r)
1811 t = self.TextIOWrapper(b)
1812 self.assertIs(t.detach(), b)
1813
1814 t = self.TextIOWrapper(b, encoding="ascii")
1815 t.write("howdy")
1816 self.assertFalse(r.getvalue())
1817 t.detach()
1818 self.assertEqual(r.getvalue(), b"howdy")
1819 self.assertRaises(ValueError, t.detach)
1820
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001821 def test_repr(self):
1822 raw = self.BytesIO("hello".encode("utf-8"))
1823 b = self.BufferedReader(raw)
1824 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001825 modname = self.TextIOWrapper.__module__
1826 self.assertEqual(repr(t),
1827 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1828 raw.name = "dummy"
1829 self.assertEqual(repr(t),
1830 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001831 t.mode = "r"
1832 self.assertEqual(repr(t),
1833 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001834 raw.name = b"dummy"
1835 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001836 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001837
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001838 def test_line_buffering(self):
1839 r = self.BytesIO()
1840 b = self.BufferedWriter(r, 1000)
1841 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001842 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001843 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001844 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001845 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001846 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001847 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 def test_encoding(self):
1850 # Check the encoding attribute is always set, and valid
1851 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001852 t = self.TextIOWrapper(b, encoding="utf-8")
1853 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001855 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 codecs.lookup(t.encoding)
1857
1858 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001859 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 b = self.BytesIO(b"abc\n\xff\n")
1861 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001862 self.assertRaises(UnicodeError, t.read)
1863 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 b = self.BytesIO(b"abc\n\xff\n")
1865 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001866 self.assertRaises(UnicodeError, t.read)
1867 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 b = self.BytesIO(b"abc\n\xff\n")
1869 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001870 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001871 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 b = self.BytesIO(b"abc\n\xff\n")
1873 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001874 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001877 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 b = self.BytesIO()
1879 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001880 self.assertRaises(UnicodeError, t.write, "\xff")
1881 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 b = self.BytesIO()
1883 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001884 self.assertRaises(UnicodeError, t.write, "\xff")
1885 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 b = self.BytesIO()
1887 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001888 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001889 t.write("abc\xffdef\n")
1890 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001891 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001892 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 b = self.BytesIO()
1894 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001895 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001896 t.write("abc\xffdef\n")
1897 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001898 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001899
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001901 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1902
1903 tests = [
1904 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001905 [ '', input_lines ],
1906 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1907 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1908 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001909 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001910 encodings = (
1911 'utf-8', 'latin-1',
1912 'utf-16', 'utf-16-le', 'utf-16-be',
1913 'utf-32', 'utf-32-le', 'utf-32-be',
1914 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001915
Guido van Rossum8358db22007-08-18 21:39:55 +00001916 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001917 # character in TextIOWrapper._pending_line.
1918 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001919 # XXX: str.encode() should return bytes
1920 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001921 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001922 for bufsize in range(1, 10):
1923 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1925 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001926 encoding=encoding)
1927 if do_reads:
1928 got_lines = []
1929 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001930 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001931 if c2 == '':
1932 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001933 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001934 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001935 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001936 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001937
1938 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001939 self.assertEqual(got_line, exp_line)
1940 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001942 def test_newlines_input(self):
1943 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001944 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1945 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03001946 (None, normalized.decode("ascii").splitlines(keepends=True)),
1947 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1949 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1950 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001951 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 buf = self.BytesIO(testdata)
1953 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001954 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001955 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001956 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 def test_newlines_output(self):
1959 testdict = {
1960 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1961 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1962 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1963 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1964 }
1965 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1966 for newline, expected in tests:
1967 buf = self.BytesIO()
1968 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1969 txt.write("AAA\nB")
1970 txt.write("BB\nCCC\n")
1971 txt.write("X\rY\r\nZ")
1972 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001973 self.assertEqual(buf.closed, False)
1974 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975
1976 def test_destructor(self):
1977 l = []
1978 base = self.BytesIO
1979 class MyBytesIO(base):
1980 def close(self):
1981 l.append(self.getvalue())
1982 base.close(self)
1983 b = MyBytesIO()
1984 t = self.TextIOWrapper(b, encoding="ascii")
1985 t.write("abc")
1986 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001987 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001988 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989
1990 def test_override_destructor(self):
1991 record = []
1992 class MyTextIO(self.TextIOWrapper):
1993 def __del__(self):
1994 record.append(1)
1995 try:
1996 f = super().__del__
1997 except AttributeError:
1998 pass
1999 else:
2000 f()
2001 def close(self):
2002 record.append(2)
2003 super().close()
2004 def flush(self):
2005 record.append(3)
2006 super().flush()
2007 b = self.BytesIO()
2008 t = MyTextIO(b, encoding="ascii")
2009 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002010 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 self.assertEqual(record, [1, 2, 3])
2012
2013 def test_error_through_destructor(self):
2014 # Test that the exception state is not modified by a destructor,
2015 # even if close() fails.
2016 rawio = self.CloseFailureIO()
2017 def f():
2018 self.TextIOWrapper(rawio).xyzzy
2019 with support.captured_output("stderr") as s:
2020 self.assertRaises(AttributeError, f)
2021 s = s.getvalue().strip()
2022 if s:
2023 # The destructor *may* have printed an unraisable error, check it
2024 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002025 self.assertTrue(s.startswith("Exception IOError: "), s)
2026 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002027
Guido van Rossum9b76da62007-04-11 01:09:03 +00002028 # Systematic tests of the text I/O API
2029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002030 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002031 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002032 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002034 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002035 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002036 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002037 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002038 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002039 self.assertEqual(f.tell(), 0)
2040 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002041 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002042 self.assertEqual(f.seek(0), 0)
2043 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002044 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002045 self.assertEqual(f.read(2), "ab")
2046 self.assertEqual(f.read(1), "c")
2047 self.assertEqual(f.read(1), "")
2048 self.assertEqual(f.read(), "")
2049 self.assertEqual(f.tell(), cookie)
2050 self.assertEqual(f.seek(0), 0)
2051 self.assertEqual(f.seek(0, 2), cookie)
2052 self.assertEqual(f.write("def"), 3)
2053 self.assertEqual(f.seek(cookie), cookie)
2054 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002055 if enc.startswith("utf"):
2056 self.multi_line_test(f, enc)
2057 f.close()
2058
2059 def multi_line_test(self, f, enc):
2060 f.seek(0)
2061 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002062 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002063 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002064 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002065 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002066 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002067 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002068 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002069 wlines.append((f.tell(), line))
2070 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002071 f.seek(0)
2072 rlines = []
2073 while True:
2074 pos = f.tell()
2075 line = f.readline()
2076 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002077 break
2078 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002079 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002081 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002082 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002083 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002084 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002085 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002086 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002087 p2 = f.tell()
2088 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(f.tell(), p0)
2090 self.assertEqual(f.readline(), "\xff\n")
2091 self.assertEqual(f.tell(), p1)
2092 self.assertEqual(f.readline(), "\xff\n")
2093 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002094 f.seek(0)
2095 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002097 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002099 f.close()
2100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 def test_seeking(self):
2102 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002103 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002104 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002105 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002107 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002108 suffix = bytes(u_suffix.encode("utf-8"))
2109 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002110 with self.open(support.TESTFN, "wb") as f:
2111 f.write(line*2)
2112 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2113 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002114 self.assertEqual(s, str(prefix, "ascii"))
2115 self.assertEqual(f.tell(), prefix_size)
2116 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002119 # Regression test for a specific bug
2120 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002121 with self.open(support.TESTFN, "wb") as f:
2122 f.write(data)
2123 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2124 f._CHUNK_SIZE # Just test that it exists
2125 f._CHUNK_SIZE = 2
2126 f.readline()
2127 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 def test_seek_and_tell(self):
2130 #Test seek/tell using the StatefulIncrementalDecoder.
2131 # Make test faster by doing smaller seeks
2132 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002133
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002134 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002135 """Tell/seek to various points within a data stream and ensure
2136 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002137 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002138 f.write(data)
2139 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 f = self.open(support.TESTFN, encoding='test_decoder')
2141 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002142 decoded = f.read()
2143 f.close()
2144
Neal Norwitze2b07052008-03-18 19:52:05 +00002145 for i in range(min_pos, len(decoded) + 1): # seek positions
2146 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002148 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002149 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002150 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002151 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002153 f.close()
2154
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002155 # Enable the test decoder.
2156 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002157
2158 # Run the tests.
2159 try:
2160 # Try each test case.
2161 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002162 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002163
2164 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002165 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2166 offset = CHUNK_SIZE - len(input)//2
2167 prefix = b'.'*offset
2168 # Don't bother seeking into the prefix (takes too long).
2169 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002170 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002171
2172 # Ensure our test decoder won't interfere with subsequent tests.
2173 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002174 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002177 data = "1234567890"
2178 tests = ("utf-16",
2179 "utf-16-le",
2180 "utf-16-be",
2181 "utf-32",
2182 "utf-32-le",
2183 "utf-32-be")
2184 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 buf = self.BytesIO()
2186 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002187 # Check if the BOM is written only once (see issue1753).
2188 f.write(data)
2189 f.write(data)
2190 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002191 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002192 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(f.read(), data * 2)
2194 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002195
Benjamin Petersona1b49012009-03-31 23:11:32 +00002196 def test_unreadable(self):
2197 class UnReadable(self.BytesIO):
2198 def readable(self):
2199 return False
2200 txt = self.TextIOWrapper(UnReadable())
2201 self.assertRaises(IOError, txt.read)
2202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203 def test_read_one_by_one(self):
2204 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002205 reads = ""
2206 while True:
2207 c = txt.read(1)
2208 if not c:
2209 break
2210 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002212
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002213 def test_readlines(self):
2214 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2215 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2216 txt.seek(0)
2217 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2218 txt.seek(0)
2219 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2220
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002221 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002222 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002223 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002224 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002225 reads = ""
2226 while True:
2227 c = txt.read(128)
2228 if not c:
2229 break
2230 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002232
2233 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002235
2236 # read one char at a time
2237 reads = ""
2238 while True:
2239 c = txt.read(1)
2240 if not c:
2241 break
2242 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002243 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002244
2245 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002246 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002247 txt._CHUNK_SIZE = 4
2248
2249 reads = ""
2250 while True:
2251 c = txt.read(4)
2252 if not c:
2253 break
2254 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002255 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002256
2257 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002258 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002259 txt._CHUNK_SIZE = 4
2260
2261 reads = txt.read(4)
2262 reads += txt.read(4)
2263 reads += txt.readline()
2264 reads += txt.readline()
2265 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002267
2268 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002269 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002270 txt._CHUNK_SIZE = 4
2271
2272 reads = txt.read(4)
2273 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002274 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002275
2276 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002278 txt._CHUNK_SIZE = 4
2279
2280 reads = txt.read(4)
2281 pos = txt.tell()
2282 txt.seek(0)
2283 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002284 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002285
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002286 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002287 buffer = self.BytesIO(self.testdata)
2288 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002289
2290 self.assertEqual(buffer.seekable(), txt.seekable())
2291
Antoine Pitroue4501852009-05-14 18:55:55 +00002292 def test_append_bom(self):
2293 # The BOM is not written again when appending to a non-empty file
2294 filename = support.TESTFN
2295 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2296 with self.open(filename, 'w', encoding=charset) as f:
2297 f.write('aaa')
2298 pos = f.tell()
2299 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002300 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002301
2302 with self.open(filename, 'a', encoding=charset) as f:
2303 f.write('xxx')
2304 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002305 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002306
2307 def test_seek_bom(self):
2308 # Same test, but when seeking manually
2309 filename = support.TESTFN
2310 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2311 with self.open(filename, 'w', encoding=charset) as f:
2312 f.write('aaa')
2313 pos = f.tell()
2314 with self.open(filename, 'r+', encoding=charset) as f:
2315 f.seek(pos)
2316 f.write('zzz')
2317 f.seek(0)
2318 f.write('bbb')
2319 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002320 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002321
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002322 def test_errors_property(self):
2323 with self.open(support.TESTFN, "w") as f:
2324 self.assertEqual(f.errors, "strict")
2325 with self.open(support.TESTFN, "w", errors="replace") as f:
2326 self.assertEqual(f.errors, "replace")
2327
Brett Cannon31f59292011-02-21 19:29:56 +00002328 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002329 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002330 def test_threads_write(self):
2331 # Issue6750: concurrent writes could duplicate data
2332 event = threading.Event()
2333 with self.open(support.TESTFN, "w", buffering=1) as f:
2334 def run(n):
2335 text = "Thread%03d\n" % n
2336 event.wait()
2337 f.write(text)
2338 threads = [threading.Thread(target=lambda n=x: run(n))
2339 for x in range(20)]
2340 for t in threads:
2341 t.start()
2342 time.sleep(0.02)
2343 event.set()
2344 for t in threads:
2345 t.join()
2346 with self.open(support.TESTFN) as f:
2347 content = f.read()
2348 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002349 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002350
Antoine Pitrou6be88762010-05-03 16:48:20 +00002351 def test_flush_error_on_close(self):
2352 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2353 def bad_flush():
2354 raise IOError()
2355 txt.flush = bad_flush
2356 self.assertRaises(IOError, txt.close) # exception not swallowed
2357
2358 def test_multi_close(self):
2359 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2360 txt.close()
2361 txt.close()
2362 txt.close()
2363 self.assertRaises(ValueError, txt.flush)
2364
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002365 def test_unseekable(self):
2366 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2367 self.assertRaises(self.UnsupportedOperation, txt.tell)
2368 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2369
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002370 def test_readonly_attributes(self):
2371 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2372 buf = self.BytesIO(self.testdata)
2373 with self.assertRaises(AttributeError):
2374 txt.buffer = buf
2375
Antoine Pitroue96ec682011-07-23 21:46:35 +02002376 def test_rawio(self):
2377 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2378 # that subprocess.Popen() can have the required unbuffered
2379 # semantics with universal_newlines=True.
2380 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2381 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2382 # Reads
2383 self.assertEqual(txt.read(4), 'abcd')
2384 self.assertEqual(txt.readline(), 'efghi\n')
2385 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2386
2387 def test_rawio_write_through(self):
2388 # Issue #12591: with write_through=True, writes don't need a flush
2389 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2390 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2391 write_through=True)
2392 txt.write('1')
2393 txt.write('23\n4')
2394 txt.write('5')
2395 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002397class CTextIOWrapperTest(TextIOWrapperTest):
2398
2399 def test_initialization(self):
2400 r = self.BytesIO(b"\xc3\xa9\n\n")
2401 b = self.BufferedReader(r, 1000)
2402 t = self.TextIOWrapper(b)
2403 self.assertRaises(TypeError, t.__init__, b, newline=42)
2404 self.assertRaises(ValueError, t.read)
2405 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2406 self.assertRaises(ValueError, t.read)
2407
2408 def test_garbage_collection(self):
2409 # C TextIOWrapper objects are collected, and collecting them flushes
2410 # all data to disk.
2411 # The Python version has __del__, so it ends in gc.garbage instead.
2412 rawio = io.FileIO(support.TESTFN, "wb")
2413 b = self.BufferedWriter(rawio)
2414 t = self.TextIOWrapper(b, encoding="ascii")
2415 t.write("456def")
2416 t.x = t
2417 wr = weakref.ref(t)
2418 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002419 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002420 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002421 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002422 self.assertEqual(f.read(), b"456def")
2423
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002424 def test_rwpair_cleared_before_textio(self):
2425 # Issue 13070: TextIOWrapper's finalization would crash when called
2426 # after the reference to the underlying BufferedRWPair's writer got
2427 # cleared by the GC.
2428 for i in range(1000):
2429 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2430 t1 = self.TextIOWrapper(b1, encoding="ascii")
2431 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2432 t2 = self.TextIOWrapper(b2, encoding="ascii")
2433 # circular references
2434 t1.buddy = t2
2435 t2.buddy = t1
2436 support.gc_collect()
2437
2438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439class PyTextIOWrapperTest(TextIOWrapperTest):
2440 pass
2441
2442
2443class IncrementalNewlineDecoderTest(unittest.TestCase):
2444
2445 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002446 # UTF-8 specific tests for a newline decoder
2447 def _check_decode(b, s, **kwargs):
2448 # We exercise getstate() / setstate() as well as decode()
2449 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002450 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002451 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002452 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002453
Antoine Pitrou180a3362008-12-14 16:36:46 +00002454 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002455
Antoine Pitrou180a3362008-12-14 16:36:46 +00002456 _check_decode(b'\xe8', "")
2457 _check_decode(b'\xa2', "")
2458 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002459
Antoine Pitrou180a3362008-12-14 16:36:46 +00002460 _check_decode(b'\xe8', "")
2461 _check_decode(b'\xa2', "")
2462 _check_decode(b'\x88', "\u8888")
2463
2464 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002465 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2466
Antoine Pitrou180a3362008-12-14 16:36:46 +00002467 decoder.reset()
2468 _check_decode(b'\n', "\n")
2469 _check_decode(b'\r', "")
2470 _check_decode(b'', "\n", final=True)
2471 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002472
Antoine Pitrou180a3362008-12-14 16:36:46 +00002473 _check_decode(b'\r', "")
2474 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002475
Antoine Pitrou180a3362008-12-14 16:36:46 +00002476 _check_decode(b'\r\r\n', "\n\n")
2477 _check_decode(b'\r', "")
2478 _check_decode(b'\r', "\n")
2479 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002480
Antoine Pitrou180a3362008-12-14 16:36:46 +00002481 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2482 _check_decode(b'\xe8\xa2\x88', "\u8888")
2483 _check_decode(b'\n', "\n")
2484 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2485 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002487 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002488 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002489 if encoding is not None:
2490 encoder = codecs.getincrementalencoder(encoding)()
2491 def _decode_bytewise(s):
2492 # Decode one byte at a time
2493 for b in encoder.encode(s):
2494 result.append(decoder.decode(bytes([b])))
2495 else:
2496 encoder = None
2497 def _decode_bytewise(s):
2498 # Decode one char at a time
2499 for c in s:
2500 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002501 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002502 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002503 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002504 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002505 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002506 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002507 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002508 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002509 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002510 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002511 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002512 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002513 input = "abc"
2514 if encoder is not None:
2515 encoder.reset()
2516 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002517 self.assertEqual(decoder.decode(input), "abc")
2518 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002519
2520 def test_newline_decoder(self):
2521 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 # None meaning the IncrementalNewlineDecoder takes unicode input
2523 # rather than bytes input
2524 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525 'utf-16', 'utf-16-le', 'utf-16-be',
2526 'utf-32', 'utf-32-le', 'utf-32-be',
2527 )
2528 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002529 decoder = enc and codecs.getincrementaldecoder(enc)()
2530 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2531 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002532 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2534 self.check_newline_decoding_utf8(decoder)
2535
Antoine Pitrou66913e22009-03-06 23:40:56 +00002536 def test_newline_bytes(self):
2537 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2538 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(dec.newlines, None)
2540 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2541 self.assertEqual(dec.newlines, None)
2542 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2543 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002544 dec = self.IncrementalNewlineDecoder(None, translate=False)
2545 _check(dec)
2546 dec = self.IncrementalNewlineDecoder(None, translate=True)
2547 _check(dec)
2548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002549class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2550 pass
2551
2552class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2553 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002554
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002555
Guido van Rossum01a27522007-03-07 01:00:12 +00002556# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002557
Guido van Rossum5abbf752007-08-27 17:39:33 +00002558class MiscIOTest(unittest.TestCase):
2559
Barry Warsaw40e82462008-11-20 20:14:50 +00002560 def tearDown(self):
2561 support.unlink(support.TESTFN)
2562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 def test___all__(self):
2564 for name in self.io.__all__:
2565 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002566 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002567 if name == "open":
2568 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002569 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002570 self.assertTrue(issubclass(obj, Exception), name)
2571 elif not name.startswith("SEEK_"):
2572 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002573
Barry Warsaw40e82462008-11-20 20:14:50 +00002574 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002575 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002576 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002577 f.close()
2578
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002579 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002580 self.assertEqual(f.name, support.TESTFN)
2581 self.assertEqual(f.buffer.name, support.TESTFN)
2582 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2583 self.assertEqual(f.mode, "U")
2584 self.assertEqual(f.buffer.mode, "rb")
2585 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002586 f.close()
2587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002588 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002589 self.assertEqual(f.mode, "w+")
2590 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2591 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002593 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002594 self.assertEqual(g.mode, "wb")
2595 self.assertEqual(g.raw.mode, "wb")
2596 self.assertEqual(g.name, f.fileno())
2597 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002598 f.close()
2599 g.close()
2600
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002601 def test_io_after_close(self):
2602 for kwargs in [
2603 {"mode": "w"},
2604 {"mode": "wb"},
2605 {"mode": "w", "buffering": 1},
2606 {"mode": "w", "buffering": 2},
2607 {"mode": "wb", "buffering": 0},
2608 {"mode": "r"},
2609 {"mode": "rb"},
2610 {"mode": "r", "buffering": 1},
2611 {"mode": "r", "buffering": 2},
2612 {"mode": "rb", "buffering": 0},
2613 {"mode": "w+"},
2614 {"mode": "w+b"},
2615 {"mode": "w+", "buffering": 1},
2616 {"mode": "w+", "buffering": 2},
2617 {"mode": "w+b", "buffering": 0},
2618 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002619 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002620 f.close()
2621 self.assertRaises(ValueError, f.flush)
2622 self.assertRaises(ValueError, f.fileno)
2623 self.assertRaises(ValueError, f.isatty)
2624 self.assertRaises(ValueError, f.__iter__)
2625 if hasattr(f, "peek"):
2626 self.assertRaises(ValueError, f.peek, 1)
2627 self.assertRaises(ValueError, f.read)
2628 if hasattr(f, "read1"):
2629 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002630 if hasattr(f, "readall"):
2631 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002632 if hasattr(f, "readinto"):
2633 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2634 self.assertRaises(ValueError, f.readline)
2635 self.assertRaises(ValueError, f.readlines)
2636 self.assertRaises(ValueError, f.seek, 0)
2637 self.assertRaises(ValueError, f.tell)
2638 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002639 self.assertRaises(ValueError, f.write,
2640 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002641 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002642 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002644 def test_blockingioerror(self):
2645 # Various BlockingIOError issues
2646 self.assertRaises(TypeError, self.BlockingIOError)
2647 self.assertRaises(TypeError, self.BlockingIOError, 1)
2648 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2649 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2650 b = self.BlockingIOError(1, "")
2651 self.assertEqual(b.characters_written, 0)
2652 class C(str):
2653 pass
2654 c = C("")
2655 b = self.BlockingIOError(1, c)
2656 c.b = b
2657 b.c = c
2658 wr = weakref.ref(c)
2659 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002660 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002661 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662
2663 def test_abcs(self):
2664 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002665 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2666 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2667 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2668 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669
2670 def _check_abc_inheritance(self, abcmodule):
2671 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002672 self.assertIsInstance(f, abcmodule.IOBase)
2673 self.assertIsInstance(f, abcmodule.RawIOBase)
2674 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2675 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002676 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002677 self.assertIsInstance(f, abcmodule.IOBase)
2678 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2679 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2680 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002682 self.assertIsInstance(f, abcmodule.IOBase)
2683 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2684 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2685 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002686
2687 def test_abc_inheritance(self):
2688 # Test implementations inherit from their respective ABCs
2689 self._check_abc_inheritance(self)
2690
2691 def test_abc_inheritance_official(self):
2692 # Test implementations inherit from the official ABCs of the
2693 # baseline "io" module.
2694 self._check_abc_inheritance(io)
2695
Antoine Pitroue033e062010-10-29 10:38:18 +00002696 def _check_warn_on_dealloc(self, *args, **kwargs):
2697 f = open(*args, **kwargs)
2698 r = repr(f)
2699 with self.assertWarns(ResourceWarning) as cm:
2700 f = None
2701 support.gc_collect()
2702 self.assertIn(r, str(cm.warning.args[0]))
2703
2704 def test_warn_on_dealloc(self):
2705 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2706 self._check_warn_on_dealloc(support.TESTFN, "wb")
2707 self._check_warn_on_dealloc(support.TESTFN, "w")
2708
2709 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2710 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002711 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002712 for fd in fds:
2713 try:
2714 os.close(fd)
2715 except EnvironmentError as e:
2716 if e.errno != errno.EBADF:
2717 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002718 self.addCleanup(cleanup_fds)
2719 r, w = os.pipe()
2720 fds += r, w
2721 self._check_warn_on_dealloc(r, *args, **kwargs)
2722 # When using closefd=False, there's no warning
2723 r, w = os.pipe()
2724 fds += r, w
2725 with warnings.catch_warnings(record=True) as recorded:
2726 open(r, *args, closefd=False, **kwargs)
2727 support.gc_collect()
2728 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002729
2730 def test_warn_on_dealloc_fd(self):
2731 self._check_warn_on_dealloc_fd("rb", buffering=0)
2732 self._check_warn_on_dealloc_fd("rb")
2733 self._check_warn_on_dealloc_fd("r")
2734
2735
Antoine Pitrou243757e2010-11-05 21:15:39 +00002736 def test_pickling(self):
2737 # Pickling file objects is forbidden
2738 for kwargs in [
2739 {"mode": "w"},
2740 {"mode": "wb"},
2741 {"mode": "wb", "buffering": 0},
2742 {"mode": "r"},
2743 {"mode": "rb"},
2744 {"mode": "rb", "buffering": 0},
2745 {"mode": "w+"},
2746 {"mode": "w+b"},
2747 {"mode": "w+b", "buffering": 0},
2748 ]:
2749 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2750 with self.open(support.TESTFN, **kwargs) as f:
2751 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753class CMiscIOTest(MiscIOTest):
2754 io = io
2755
2756class PyMiscIOTest(MiscIOTest):
2757 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002758
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002759
2760@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2761class SignalsTest(unittest.TestCase):
2762
2763 def setUp(self):
2764 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2765
2766 def tearDown(self):
2767 signal.signal(signal.SIGALRM, self.oldalrm)
2768
2769 def alarm_interrupt(self, sig, frame):
2770 1/0
2771
2772 @unittest.skipUnless(threading, 'Threading required for this test.')
2773 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2774 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002775 invokes the signal handler, and bubbles up the exception raised
2776 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002777 read_results = []
2778 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002779 if hasattr(signal, 'pthread_sigmask'):
2780 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002781 s = os.read(r, 1)
2782 read_results.append(s)
2783 t = threading.Thread(target=_read)
2784 t.daemon = True
2785 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002786 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002787 try:
2788 wio = self.io.open(w, **fdopen_kwargs)
2789 t.start()
2790 signal.alarm(1)
2791 # Fill the pipe enough that the write will be blocking.
2792 # It will be interrupted by the timer armed above. Since the
2793 # other thread has read one byte, the low-level write will
2794 # return with a successful (partial) result rather than an EINTR.
2795 # The buffered IO layer must check for pending signal
2796 # handlers, which in this case will invoke alarm_interrupt().
2797 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002798 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002799 t.join()
2800 # We got one byte, get another one and check that it isn't a
2801 # repeat of the first one.
2802 read_results.append(os.read(r, 1))
2803 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2804 finally:
2805 os.close(w)
2806 os.close(r)
2807 # This is deliberate. If we didn't close the file descriptor
2808 # before closing wio, wio would try to flush its internal
2809 # buffer, and block again.
2810 try:
2811 wio.close()
2812 except IOError as e:
2813 if e.errno != errno.EBADF:
2814 raise
2815
2816 def test_interrupted_write_unbuffered(self):
2817 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2818
2819 def test_interrupted_write_buffered(self):
2820 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2821
2822 def test_interrupted_write_text(self):
2823 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2824
Brett Cannon31f59292011-02-21 19:29:56 +00002825 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002826 def check_reentrant_write(self, data, **fdopen_kwargs):
2827 def on_alarm(*args):
2828 # Will be called reentrantly from the same thread
2829 wio.write(data)
2830 1/0
2831 signal.signal(signal.SIGALRM, on_alarm)
2832 r, w = os.pipe()
2833 wio = self.io.open(w, **fdopen_kwargs)
2834 try:
2835 signal.alarm(1)
2836 # Either the reentrant call to wio.write() fails with RuntimeError,
2837 # or the signal handler raises ZeroDivisionError.
2838 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2839 while 1:
2840 for i in range(100):
2841 wio.write(data)
2842 wio.flush()
2843 # Make sure the buffer doesn't fill up and block further writes
2844 os.read(r, len(data) * 100)
2845 exc = cm.exception
2846 if isinstance(exc, RuntimeError):
2847 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2848 finally:
2849 wio.close()
2850 os.close(r)
2851
2852 def test_reentrant_write_buffered(self):
2853 self.check_reentrant_write(b"xy", mode="wb")
2854
2855 def test_reentrant_write_text(self):
2856 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2857
Antoine Pitrou707ce822011-02-25 21:24:11 +00002858 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2859 """Check that a buffered read, when it gets interrupted (either
2860 returning a partial result or EINTR), properly invokes the signal
2861 handler and retries if the latter returned successfully."""
2862 r, w = os.pipe()
2863 fdopen_kwargs["closefd"] = False
2864 def alarm_handler(sig, frame):
2865 os.write(w, b"bar")
2866 signal.signal(signal.SIGALRM, alarm_handler)
2867 try:
2868 rio = self.io.open(r, **fdopen_kwargs)
2869 os.write(w, b"foo")
2870 signal.alarm(1)
2871 # Expected behaviour:
2872 # - first raw read() returns partial b"foo"
2873 # - second raw read() returns EINTR
2874 # - third raw read() returns b"bar"
2875 self.assertEqual(decode(rio.read(6)), "foobar")
2876 finally:
2877 rio.close()
2878 os.close(w)
2879 os.close(r)
2880
Antoine Pitrou20db5112011-08-19 20:32:34 +02002881 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002882 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2883 mode="rb")
2884
Antoine Pitrou20db5112011-08-19 20:32:34 +02002885 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002886 self.check_interrupted_read_retry(lambda x: x,
2887 mode="r")
2888
2889 @unittest.skipUnless(threading, 'Threading required for this test.')
2890 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2891 """Check that a buffered write, when it gets interrupted (either
2892 returning a partial result or EINTR), properly invokes the signal
2893 handler and retries if the latter returned successfully."""
2894 select = support.import_module("select")
2895 # A quantity that exceeds the buffer size of an anonymous pipe's
2896 # write end.
2897 N = 1024 * 1024
2898 r, w = os.pipe()
2899 fdopen_kwargs["closefd"] = False
2900 # We need a separate thread to read from the pipe and allow the
2901 # write() to finish. This thread is started after the SIGALRM is
2902 # received (forcing a first EINTR in write()).
2903 read_results = []
2904 write_finished = False
2905 def _read():
2906 while not write_finished:
2907 while r in select.select([r], [], [], 1.0)[0]:
2908 s = os.read(r, 1024)
2909 read_results.append(s)
2910 t = threading.Thread(target=_read)
2911 t.daemon = True
2912 def alarm1(sig, frame):
2913 signal.signal(signal.SIGALRM, alarm2)
2914 signal.alarm(1)
2915 def alarm2(sig, frame):
2916 t.start()
2917 signal.signal(signal.SIGALRM, alarm1)
2918 try:
2919 wio = self.io.open(w, **fdopen_kwargs)
2920 signal.alarm(1)
2921 # Expected behaviour:
2922 # - first raw write() is partial (because of the limited pipe buffer
2923 # and the first alarm)
2924 # - second raw write() returns EINTR (because of the second alarm)
2925 # - subsequent write()s are successful (either partial or complete)
2926 self.assertEqual(N, wio.write(item * N))
2927 wio.flush()
2928 write_finished = True
2929 t.join()
2930 self.assertEqual(N, sum(len(x) for x in read_results))
2931 finally:
2932 write_finished = True
2933 os.close(w)
2934 os.close(r)
2935 # This is deliberate. If we didn't close the file descriptor
2936 # before closing wio, wio would try to flush its internal
2937 # buffer, and could block (in case of failure).
2938 try:
2939 wio.close()
2940 except IOError as e:
2941 if e.errno != errno.EBADF:
2942 raise
2943
Antoine Pitrou20db5112011-08-19 20:32:34 +02002944 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002945 self.check_interrupted_write_retry(b"x", mode="wb")
2946
Antoine Pitrou20db5112011-08-19 20:32:34 +02002947 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002948 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2949
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002950
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002951class CSignalsTest(SignalsTest):
2952 io = io
2953
2954class PySignalsTest(SignalsTest):
2955 io = pyio
2956
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002957 # Handling reentrancy issues would slow down _pyio even more, so the
2958 # tests are disabled.
2959 test_reentrant_write_buffered = None
2960 test_reentrant_write_text = None
2961
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002962
Guido van Rossum28524c72007-02-27 05:47:44 +00002963def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002964 tests = (CIOTest, PyIOTest,
2965 CBufferedReaderTest, PyBufferedReaderTest,
2966 CBufferedWriterTest, PyBufferedWriterTest,
2967 CBufferedRWPairTest, PyBufferedRWPairTest,
2968 CBufferedRandomTest, PyBufferedRandomTest,
2969 StatefulIncrementalDecoderTest,
2970 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2971 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002972 CMiscIOTest, PyMiscIOTest,
2973 CSignalsTest, PySignalsTest,
2974 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002975
2976 # Put the namespaces of the IO module we are testing and some useful mock
2977 # classes in the __dict__ of each test.
2978 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002979 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002980 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2981 c_io_ns = {name : getattr(io, name) for name in all_members}
2982 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2983 globs = globals()
2984 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2985 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2986 # Avoid turning open into a bound method.
2987 py_io_ns["open"] = pyio.OpenWrapper
2988 for test in tests:
2989 if test.__name__.startswith("C"):
2990 for name, obj in c_io_ns.items():
2991 setattr(test, name, obj)
2992 elif test.__name__.startswith("Py"):
2993 for name, obj in py_io_ns.items():
2994 setattr(test, name, obj)
2995
2996 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002997
2998if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002999 test_main()