blob: 318f7a7571a07f104d6677577b327ca59a21264a [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000049 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400613 def test_types_have_dict(self):
614 test = (
615 self.IOBase(),
616 self.RawIOBase(),
617 self.TextIOBase(),
618 self.StringIO(),
619 self.BytesIO()
620 )
621 for obj in test:
622 self.assertTrue(hasattr(obj, "__dict__"))
623
Ross Lagerwall59142db2011-10-31 20:34:46 +0200624 def test_opener(self):
625 with self.open(support.TESTFN, "w") as f:
626 f.write("egg\n")
627 fd = os.open(support.TESTFN, os.O_RDONLY)
628 def opener(path, flags):
629 return fd
630 with self.open("non-existent", "r", opener=opener) as f:
631 self.assertEqual(f.read(), "egg\n")
632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200634
635 def test_IOBase_finalize(self):
636 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
637 # class which inherits IOBase and an object of this class are caught
638 # in a reference cycle and close() is already in the method cache.
639 class MyIO(self.IOBase):
640 def close(self):
641 pass
642
643 # create an instance to populate the method cache
644 MyIO()
645 obj = MyIO()
646 obj.obj = obj
647 wr = weakref.ref(obj)
648 del MyIO
649 del obj
650 support.gc_collect()
651 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653class PyIOTest(IOTest):
654 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000655
Guido van Rossuma9e20242007-03-08 00:43:48 +0000656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657class CommonBufferedTests:
658 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
659
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000660 def test_detach(self):
661 raw = self.MockRawIO()
662 buf = self.tp(raw)
663 self.assertIs(buf.detach(), raw)
664 self.assertRaises(ValueError, buf.detach)
665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 def test_fileno(self):
667 rawio = self.MockRawIO()
668 bufio = self.tp(rawio)
669
Ezio Melottib3aedd42010-11-20 19:04:17 +0000670 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671
672 def test_no_fileno(self):
673 # XXX will we always have fileno() function? If so, kill
674 # this test. Else, write it.
675 pass
676
677 def test_invalid_args(self):
678 rawio = self.MockRawIO()
679 bufio = self.tp(rawio)
680 # Invalid whence
681 self.assertRaises(ValueError, bufio.seek, 0, -1)
682 self.assertRaises(ValueError, bufio.seek, 0, 3)
683
684 def test_override_destructor(self):
685 tp = self.tp
686 record = []
687 class MyBufferedIO(tp):
688 def __del__(self):
689 record.append(1)
690 try:
691 f = super().__del__
692 except AttributeError:
693 pass
694 else:
695 f()
696 def close(self):
697 record.append(2)
698 super().close()
699 def flush(self):
700 record.append(3)
701 super().flush()
702 rawio = self.MockRawIO()
703 bufio = MyBufferedIO(rawio)
704 writable = bufio.writable()
705 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000706 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 if writable:
708 self.assertEqual(record, [1, 2, 3])
709 else:
710 self.assertEqual(record, [1, 2])
711
712 def test_context_manager(self):
713 # Test usability as a context manager
714 rawio = self.MockRawIO()
715 bufio = self.tp(rawio)
716 def _with():
717 with bufio:
718 pass
719 _with()
720 # bufio should now be closed, and using it a second time should raise
721 # a ValueError.
722 self.assertRaises(ValueError, _with)
723
724 def test_error_through_destructor(self):
725 # Test that the exception state is not modified by a destructor,
726 # even if close() fails.
727 rawio = self.CloseFailureIO()
728 def f():
729 self.tp(rawio).xyzzy
730 with support.captured_output("stderr") as s:
731 self.assertRaises(AttributeError, f)
732 s = s.getvalue().strip()
733 if s:
734 # The destructor *may* have printed an unraisable error, check it
735 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000736 self.assertTrue(s.startswith("Exception IOError: "), s)
737 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000738
Antoine Pitrou716c4442009-05-23 19:04:03 +0000739 def test_repr(self):
740 raw = self.MockRawIO()
741 b = self.tp(raw)
742 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
743 self.assertEqual(repr(b), "<%s>" % clsname)
744 raw.name = "dummy"
745 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
746 raw.name = b"dummy"
747 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
748
Antoine Pitrou6be88762010-05-03 16:48:20 +0000749 def test_flush_error_on_close(self):
750 raw = self.MockRawIO()
751 def bad_flush():
752 raise IOError()
753 raw.flush = bad_flush
754 b = self.tp(raw)
755 self.assertRaises(IOError, b.close) # exception not swallowed
756
757 def test_multi_close(self):
758 raw = self.MockRawIO()
759 b = self.tp(raw)
760 b.close()
761 b.close()
762 b.close()
763 self.assertRaises(ValueError, b.flush)
764
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000765 def test_unseekable(self):
766 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
767 self.assertRaises(self.UnsupportedOperation, bufio.tell)
768 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
769
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000770 def test_readonly_attributes(self):
771 raw = self.MockRawIO()
772 buf = self.tp(raw)
773 x = self.MockRawIO()
774 with self.assertRaises(AttributeError):
775 buf.raw = x
776
Guido van Rossum78892e42007-04-06 17:31:18 +0000777
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000778class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
779 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000781 def test_constructor(self):
782 rawio = self.MockRawIO([b"abc"])
783 bufio = self.tp(rawio)
784 bufio.__init__(rawio)
785 bufio.__init__(rawio, buffer_size=1024)
786 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000787 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000788 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
789 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
790 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
791 rawio = self.MockRawIO([b"abc"])
792 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000793 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000795 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000796 for arg in (None, 7):
797 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
798 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000799 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000800 # Invalid args
801 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000803 def test_read1(self):
804 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
805 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(b"a", bufio.read(1))
807 self.assertEqual(b"b", bufio.read1(1))
808 self.assertEqual(rawio._reads, 1)
809 self.assertEqual(b"c", bufio.read1(100))
810 self.assertEqual(rawio._reads, 1)
811 self.assertEqual(b"d", bufio.read1(100))
812 self.assertEqual(rawio._reads, 2)
813 self.assertEqual(b"efg", bufio.read1(100))
814 self.assertEqual(rawio._reads, 3)
815 self.assertEqual(b"", bufio.read1(100))
816 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 # Invalid args
818 self.assertRaises(ValueError, bufio.read1, -1)
819
820 def test_readinto(self):
821 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
822 bufio = self.tp(rawio)
823 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000824 self.assertEqual(bufio.readinto(b), 2)
825 self.assertEqual(b, b"ab")
826 self.assertEqual(bufio.readinto(b), 2)
827 self.assertEqual(b, b"cd")
828 self.assertEqual(bufio.readinto(b), 2)
829 self.assertEqual(b, b"ef")
830 self.assertEqual(bufio.readinto(b), 1)
831 self.assertEqual(b, b"gf")
832 self.assertEqual(bufio.readinto(b), 0)
833 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +0200834 rawio = self.MockRawIO((b"abc", None))
835 bufio = self.tp(rawio)
836 self.assertEqual(bufio.readinto(b), 2)
837 self.assertEqual(b, b"ab")
838 self.assertEqual(bufio.readinto(b), 1)
839 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000841 def test_readlines(self):
842 def bufio():
843 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
844 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000845 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
846 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
847 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000848
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000849 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000850 data = b"abcdefghi"
851 dlen = len(data)
852
853 tests = [
854 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
855 [ 100, [ 3, 3, 3], [ dlen ] ],
856 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
857 ]
858
859 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000860 rawio = self.MockFileIO(data)
861 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000862 pos = 0
863 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000864 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000865 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000866 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000867 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000870 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000871 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
872 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(b"abcd", bufio.read(6))
874 self.assertEqual(b"e", bufio.read(1))
875 self.assertEqual(b"fg", bufio.read())
876 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200877 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000878 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000879
Victor Stinnera80987f2011-05-25 22:47:16 +0200880 rawio = self.MockRawIO((b"a", None, None))
881 self.assertEqual(b"a", rawio.readall())
882 self.assertIsNone(rawio.readall())
883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 def test_read_past_eof(self):
885 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
886 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000887
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 def test_read_all(self):
891 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
892 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000893
Ezio Melottib3aedd42010-11-20 19:04:17 +0000894 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000895
Victor Stinner45df8202010-04-28 22:31:17 +0000896 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000897 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000898 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000899 try:
900 # Write out many bytes with exactly the same number of 0's,
901 # 1's... 255's. This will help us check that concurrent reading
902 # doesn't duplicate or forget contents.
903 N = 1000
904 l = list(range(256)) * N
905 random.shuffle(l)
906 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000907 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000908 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000909 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000911 errors = []
912 results = []
913 def f():
914 try:
915 # Intra-buffer read then buffer-flushing read
916 for n in cycle([1, 19]):
917 s = bufio.read(n)
918 if not s:
919 break
920 # list.append() is atomic
921 results.append(s)
922 except Exception as e:
923 errors.append(e)
924 raise
925 threads = [threading.Thread(target=f) for x in range(20)]
926 for t in threads:
927 t.start()
928 time.sleep(0.02) # yield
929 for t in threads:
930 t.join()
931 self.assertFalse(errors,
932 "the following exceptions were caught: %r" % errors)
933 s = b''.join(results)
934 for i in range(256):
935 c = bytes(bytearray([i]))
936 self.assertEqual(s.count(c), N)
937 finally:
938 support.unlink(support.TESTFN)
939
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200940 def test_unseekable(self):
941 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
942 self.assertRaises(self.UnsupportedOperation, bufio.tell)
943 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
944 bufio.read(1)
945 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
946 self.assertRaises(self.UnsupportedOperation, bufio.tell)
947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 def test_misbehaved_io(self):
949 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
950 bufio = self.tp(rawio)
951 self.assertRaises(IOError, bufio.seek, 0)
952 self.assertRaises(IOError, bufio.tell)
953
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000954 def test_no_extraneous_read(self):
955 # Issue #9550; when the raw IO object has satisfied the read request,
956 # we should not issue any additional reads, otherwise it may block
957 # (e.g. socket).
958 bufsize = 16
959 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
960 rawio = self.MockRawIO([b"x" * n])
961 bufio = self.tp(rawio, bufsize)
962 self.assertEqual(bufio.read(n), b"x" * n)
963 # Simple case: one raw read is enough to satisfy the request.
964 self.assertEqual(rawio._extraneous_reads, 0,
965 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
966 # A more complex case where two raw reads are needed to satisfy
967 # the request.
968 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
969 bufio = self.tp(rawio, bufsize)
970 self.assertEqual(bufio.read(n), b"x" * n)
971 self.assertEqual(rawio._extraneous_reads, 0,
972 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
973
974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975class CBufferedReaderTest(BufferedReaderTest):
976 tp = io.BufferedReader
977
978 def test_constructor(self):
979 BufferedReaderTest.test_constructor(self)
980 # The allocation can succeed on 32-bit builds, e.g. with more
981 # than 2GB RAM and a 64-bit kernel.
982 if sys.maxsize > 0x7FFFFFFF:
983 rawio = self.MockRawIO()
984 bufio = self.tp(rawio)
985 self.assertRaises((OverflowError, MemoryError, ValueError),
986 bufio.__init__, rawio, sys.maxsize)
987
988 def test_initialization(self):
989 rawio = self.MockRawIO([b"abc"])
990 bufio = self.tp(rawio)
991 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
992 self.assertRaises(ValueError, bufio.read)
993 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
994 self.assertRaises(ValueError, bufio.read)
995 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
996 self.assertRaises(ValueError, bufio.read)
997
998 def test_misbehaved_io_read(self):
999 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1000 bufio = self.tp(rawio)
1001 # _pyio.BufferedReader seems to implement reading different, so that
1002 # checking this is not so easy.
1003 self.assertRaises(IOError, bufio.read, 10)
1004
1005 def test_garbage_collection(self):
1006 # C BufferedReader objects are collected.
1007 # The Python version has __del__, so it ends into gc.garbage instead
1008 rawio = self.FileIO(support.TESTFN, "w+b")
1009 f = self.tp(rawio)
1010 f.f = f
1011 wr = weakref.ref(f)
1012 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +00001013 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001014 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015
1016class PyBufferedReaderTest(BufferedReaderTest):
1017 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001018
Guido van Rossuma9e20242007-03-08 00:43:48 +00001019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1021 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001022
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001023 def test_constructor(self):
1024 rawio = self.MockRawIO()
1025 bufio = self.tp(rawio)
1026 bufio.__init__(rawio)
1027 bufio.__init__(rawio, buffer_size=1024)
1028 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001029 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030 bufio.flush()
1031 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1032 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1033 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1034 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001035 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001037 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001038
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001039 def test_detach_flush(self):
1040 raw = self.MockRawIO()
1041 buf = self.tp(raw)
1042 buf.write(b"howdy!")
1043 self.assertFalse(raw._write_stack)
1044 buf.detach()
1045 self.assertEqual(raw._write_stack, [b"howdy!"])
1046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001048 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049 writer = self.MockRawIO()
1050 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001051 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001052 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_write_overflow(self):
1055 writer = self.MockRawIO()
1056 bufio = self.tp(writer, 8)
1057 contents = b"abcdefghijklmnop"
1058 for n in range(0, len(contents), 3):
1059 bufio.write(contents[n:n+3])
1060 flushed = b"".join(writer._write_stack)
1061 # At least (total - 8) bytes were implicitly flushed, perhaps more
1062 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001063 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def check_writes(self, intermediate_func):
1066 # Lots of writes, test the flushed output is as expected.
1067 contents = bytes(range(256)) * 1000
1068 n = 0
1069 writer = self.MockRawIO()
1070 bufio = self.tp(writer, 13)
1071 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1072 def gen_sizes():
1073 for size in count(1):
1074 for i in range(15):
1075 yield size
1076 sizes = gen_sizes()
1077 while n < len(contents):
1078 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001079 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 intermediate_func(bufio)
1081 n += size
1082 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001083 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 def test_writes(self):
1086 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001087
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088 def test_writes_and_flushes(self):
1089 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 def test_writes_and_seeks(self):
1092 def _seekabs(bufio):
1093 pos = bufio.tell()
1094 bufio.seek(pos + 1, 0)
1095 bufio.seek(pos - 1, 0)
1096 bufio.seek(pos, 0)
1097 self.check_writes(_seekabs)
1098 def _seekrel(bufio):
1099 pos = bufio.seek(0, 1)
1100 bufio.seek(+1, 1)
1101 bufio.seek(-1, 1)
1102 bufio.seek(pos, 0)
1103 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 def test_writes_and_truncates(self):
1106 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001107
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 def test_write_non_blocking(self):
1109 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001110 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001111
Ezio Melottib3aedd42010-11-20 19:04:17 +00001112 self.assertEqual(bufio.write(b"abcd"), 4)
1113 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 # 1 byte will be written, the rest will be buffered
1115 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001116 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1119 raw.block_on(b"0")
1120 try:
1121 bufio.write(b"opqrwxyz0123456789")
1122 except self.BlockingIOError as e:
1123 written = e.characters_written
1124 else:
1125 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001126 self.assertEqual(written, 16)
1127 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001129
Ezio Melottib3aedd42010-11-20 19:04:17 +00001130 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001131 s = raw.pop_written()
1132 # Previously buffered bytes were flushed
1133 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001135 def test_write_and_rewind(self):
1136 raw = io.BytesIO()
1137 bufio = self.tp(raw, 4)
1138 self.assertEqual(bufio.write(b"abcdef"), 6)
1139 self.assertEqual(bufio.tell(), 6)
1140 bufio.seek(0, 0)
1141 self.assertEqual(bufio.write(b"XY"), 2)
1142 bufio.seek(6, 0)
1143 self.assertEqual(raw.getvalue(), b"XYcdef")
1144 self.assertEqual(bufio.write(b"123456"), 6)
1145 bufio.flush()
1146 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 def test_flush(self):
1149 writer = self.MockRawIO()
1150 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001151 bufio.write(b"abc")
1152 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001153 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001154
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001155 def test_destructor(self):
1156 writer = self.MockRawIO()
1157 bufio = self.tp(writer, 8)
1158 bufio.write(b"abc")
1159 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001160 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001161 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162
1163 def test_truncate(self):
1164 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001165 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001166 bufio = self.tp(raw, 8)
1167 bufio.write(b"abcdef")
1168 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001169 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001170 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001171 self.assertEqual(f.read(), b"abc")
1172
Victor Stinner45df8202010-04-28 22:31:17 +00001173 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001174 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001175 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001176 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001177 # Write out many bytes from many threads and test they were
1178 # all flushed.
1179 N = 1000
1180 contents = bytes(range(256)) * N
1181 sizes = cycle([1, 19])
1182 n = 0
1183 queue = deque()
1184 while n < len(contents):
1185 size = next(sizes)
1186 queue.append(contents[n:n+size])
1187 n += size
1188 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001189 # We use a real file object because it allows us to
1190 # exercise situations where the GIL is released before
1191 # writing the buffer to the raw streams. This is in addition
1192 # to concurrency issues due to switching threads in the middle
1193 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001194 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001196 errors = []
1197 def f():
1198 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001199 while True:
1200 try:
1201 s = queue.popleft()
1202 except IndexError:
1203 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001204 bufio.write(s)
1205 except Exception as e:
1206 errors.append(e)
1207 raise
1208 threads = [threading.Thread(target=f) for x in range(20)]
1209 for t in threads:
1210 t.start()
1211 time.sleep(0.02) # yield
1212 for t in threads:
1213 t.join()
1214 self.assertFalse(errors,
1215 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001216 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001217 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001218 s = f.read()
1219 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001220 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001221 finally:
1222 support.unlink(support.TESTFN)
1223
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 def test_misbehaved_io(self):
1225 rawio = self.MisbehavedRawIO()
1226 bufio = self.tp(rawio, 5)
1227 self.assertRaises(IOError, bufio.seek, 0)
1228 self.assertRaises(IOError, bufio.tell)
1229 self.assertRaises(IOError, bufio.write, b"abcdef")
1230
Benjamin Peterson59406a92009-03-26 17:10:29 +00001231 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001232 with support.check_warnings(("max_buffer_size is deprecated",
1233 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001234 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001235
1236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237class CBufferedWriterTest(BufferedWriterTest):
1238 tp = io.BufferedWriter
1239
1240 def test_constructor(self):
1241 BufferedWriterTest.test_constructor(self)
1242 # The allocation can succeed on 32-bit builds, e.g. with more
1243 # than 2GB RAM and a 64-bit kernel.
1244 if sys.maxsize > 0x7FFFFFFF:
1245 rawio = self.MockRawIO()
1246 bufio = self.tp(rawio)
1247 self.assertRaises((OverflowError, MemoryError, ValueError),
1248 bufio.__init__, rawio, sys.maxsize)
1249
1250 def test_initialization(self):
1251 rawio = self.MockRawIO()
1252 bufio = self.tp(rawio)
1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1254 self.assertRaises(ValueError, bufio.write, b"def")
1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1256 self.assertRaises(ValueError, bufio.write, b"def")
1257 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1258 self.assertRaises(ValueError, bufio.write, b"def")
1259
1260 def test_garbage_collection(self):
1261 # C BufferedWriter objects are collected, and collecting them flushes
1262 # all data to disk.
1263 # The Python version has __del__, so it ends into gc.garbage instead
1264 rawio = self.FileIO(support.TESTFN, "w+b")
1265 f = self.tp(rawio)
1266 f.write(b"123xxx")
1267 f.x = f
1268 wr = weakref.ref(f)
1269 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001270 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001271 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001272 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 self.assertEqual(f.read(), b"123xxx")
1274
1275
1276class PyBufferedWriterTest(BufferedWriterTest):
1277 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001278
Guido van Rossum01a27522007-03-07 01:00:12 +00001279class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001280
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001281 def test_constructor(self):
1282 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001283 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001284
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001285 def test_detach(self):
1286 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1287 self.assertRaises(self.UnsupportedOperation, pair.detach)
1288
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001289 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001290 with support.check_warnings(("max_buffer_size is deprecated",
1291 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001292 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001293
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001294 def test_constructor_with_not_readable(self):
1295 class NotReadable(MockRawIO):
1296 def readable(self):
1297 return False
1298
1299 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1300
1301 def test_constructor_with_not_writeable(self):
1302 class NotWriteable(MockRawIO):
1303 def writable(self):
1304 return False
1305
1306 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1307
1308 def test_read(self):
1309 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1310
1311 self.assertEqual(pair.read(3), b"abc")
1312 self.assertEqual(pair.read(1), b"d")
1313 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001314 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1315 self.assertEqual(pair.read(None), b"abc")
1316
1317 def test_readlines(self):
1318 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1319 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1320 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1321 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001322
1323 def test_read1(self):
1324 # .read1() is delegated to the underlying reader object, so this test
1325 # can be shallow.
1326 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1327
1328 self.assertEqual(pair.read1(3), b"abc")
1329
1330 def test_readinto(self):
1331 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1332
1333 data = bytearray(5)
1334 self.assertEqual(pair.readinto(data), 5)
1335 self.assertEqual(data, b"abcde")
1336
1337 def test_write(self):
1338 w = self.MockRawIO()
1339 pair = self.tp(self.MockRawIO(), w)
1340
1341 pair.write(b"abc")
1342 pair.flush()
1343 pair.write(b"def")
1344 pair.flush()
1345 self.assertEqual(w._write_stack, [b"abc", b"def"])
1346
1347 def test_peek(self):
1348 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1349
1350 self.assertTrue(pair.peek(3).startswith(b"abc"))
1351 self.assertEqual(pair.read(3), b"abc")
1352
1353 def test_readable(self):
1354 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1355 self.assertTrue(pair.readable())
1356
1357 def test_writeable(self):
1358 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1359 self.assertTrue(pair.writable())
1360
1361 def test_seekable(self):
1362 # BufferedRWPairs are never seekable, even if their readers and writers
1363 # are.
1364 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1365 self.assertFalse(pair.seekable())
1366
1367 # .flush() is delegated to the underlying writer object and has been
1368 # tested in the test_write method.
1369
1370 def test_close_and_closed(self):
1371 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1372 self.assertFalse(pair.closed)
1373 pair.close()
1374 self.assertTrue(pair.closed)
1375
1376 def test_isatty(self):
1377 class SelectableIsAtty(MockRawIO):
1378 def __init__(self, isatty):
1379 MockRawIO.__init__(self)
1380 self._isatty = isatty
1381
1382 def isatty(self):
1383 return self._isatty
1384
1385 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1386 self.assertFalse(pair.isatty())
1387
1388 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1389 self.assertTrue(pair.isatty())
1390
1391 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1392 self.assertTrue(pair.isatty())
1393
1394 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1395 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397class CBufferedRWPairTest(BufferedRWPairTest):
1398 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001399
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400class PyBufferedRWPairTest(BufferedRWPairTest):
1401 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403
1404class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1405 read_mode = "rb+"
1406 write_mode = "wb+"
1407
1408 def test_constructor(self):
1409 BufferedReaderTest.test_constructor(self)
1410 BufferedWriterTest.test_constructor(self)
1411
1412 def test_read_and_write(self):
1413 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001414 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001415
1416 self.assertEqual(b"as", rw.read(2))
1417 rw.write(b"ddd")
1418 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001419 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_seek_and_tell(self):
1424 raw = self.BytesIO(b"asdfghjkl")
1425 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001426
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(b"as", rw.read(2))
1428 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001429 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001430 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001431
Antoine Pitroue05565e2011-08-20 14:39:23 +02001432 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001433 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001434 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001435 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001436 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001438 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(7, rw.tell())
1440 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001441 rw.flush()
1442 self.assertEqual(b"asdf123fl", raw.getvalue())
1443
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001444 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001445
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001446 def check_flush_and_read(self, read_func):
1447 raw = self.BytesIO(b"abcdefghi")
1448 bufio = self.tp(raw)
1449
Ezio Melottib3aedd42010-11-20 19:04:17 +00001450 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001452 self.assertEqual(b"ef", read_func(bufio, 2))
1453 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001455 self.assertEqual(6, bufio.tell())
1456 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001457 raw.seek(0, 0)
1458 raw.write(b"XYZ")
1459 # flush() resets the read buffer
1460 bufio.flush()
1461 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001462 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463
1464 def test_flush_and_read(self):
1465 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1466
1467 def test_flush_and_readinto(self):
1468 def _readinto(bufio, n=-1):
1469 b = bytearray(n if n >= 0 else 9999)
1470 n = bufio.readinto(b)
1471 return bytes(b[:n])
1472 self.check_flush_and_read(_readinto)
1473
1474 def test_flush_and_peek(self):
1475 def _peek(bufio, n=-1):
1476 # This relies on the fact that the buffer can contain the whole
1477 # raw stream, otherwise peek() can return less.
1478 b = bufio.peek(n)
1479 if n != -1:
1480 b = b[:n]
1481 bufio.seek(len(b), 1)
1482 return b
1483 self.check_flush_and_read(_peek)
1484
1485 def test_flush_and_write(self):
1486 raw = self.BytesIO(b"abcdefghi")
1487 bufio = self.tp(raw)
1488
1489 bufio.write(b"123")
1490 bufio.flush()
1491 bufio.write(b"45")
1492 bufio.flush()
1493 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001494 self.assertEqual(b"12345fghi", raw.getvalue())
1495 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001496
1497 def test_threads(self):
1498 BufferedReaderTest.test_threads(self)
1499 BufferedWriterTest.test_threads(self)
1500
1501 def test_writes_and_peek(self):
1502 def _peek(bufio):
1503 bufio.peek(1)
1504 self.check_writes(_peek)
1505 def _peek(bufio):
1506 pos = bufio.tell()
1507 bufio.seek(-1, 1)
1508 bufio.peek(1)
1509 bufio.seek(pos, 0)
1510 self.check_writes(_peek)
1511
1512 def test_writes_and_reads(self):
1513 def _read(bufio):
1514 bufio.seek(-1, 1)
1515 bufio.read(1)
1516 self.check_writes(_read)
1517
1518 def test_writes_and_read1s(self):
1519 def _read1(bufio):
1520 bufio.seek(-1, 1)
1521 bufio.read1(1)
1522 self.check_writes(_read1)
1523
1524 def test_writes_and_readintos(self):
1525 def _read(bufio):
1526 bufio.seek(-1, 1)
1527 bufio.readinto(bytearray(1))
1528 self.check_writes(_read)
1529
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001530 def test_write_after_readahead(self):
1531 # Issue #6629: writing after the buffer was filled by readahead should
1532 # first rewind the raw stream.
1533 for overwrite_size in [1, 5]:
1534 raw = self.BytesIO(b"A" * 10)
1535 bufio = self.tp(raw, 4)
1536 # Trigger readahead
1537 self.assertEqual(bufio.read(1), b"A")
1538 self.assertEqual(bufio.tell(), 1)
1539 # Overwriting should rewind the raw stream if it needs so
1540 bufio.write(b"B" * overwrite_size)
1541 self.assertEqual(bufio.tell(), overwrite_size + 1)
1542 # If the write size was smaller than the buffer size, flush() and
1543 # check that rewind happens.
1544 bufio.flush()
1545 self.assertEqual(bufio.tell(), overwrite_size + 1)
1546 s = raw.getvalue()
1547 self.assertEqual(s,
1548 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1549
Antoine Pitrou7c404892011-05-13 00:13:33 +02001550 def test_write_rewind_write(self):
1551 # Various combinations of reading / writing / seeking backwards / writing again
1552 def mutate(bufio, pos1, pos2):
1553 assert pos2 >= pos1
1554 # Fill the buffer
1555 bufio.seek(pos1)
1556 bufio.read(pos2 - pos1)
1557 bufio.write(b'\x02')
1558 # This writes earlier than the previous write, but still inside
1559 # the buffer.
1560 bufio.seek(pos1)
1561 bufio.write(b'\x01')
1562
1563 b = b"\x80\x81\x82\x83\x84"
1564 for i in range(0, len(b)):
1565 for j in range(i, len(b)):
1566 raw = self.BytesIO(b)
1567 bufio = self.tp(raw, 100)
1568 mutate(bufio, i, j)
1569 bufio.flush()
1570 expected = bytearray(b)
1571 expected[j] = 2
1572 expected[i] = 1
1573 self.assertEqual(raw.getvalue(), expected,
1574 "failed result for i=%d, j=%d" % (i, j))
1575
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001576 def test_truncate_after_read_or_write(self):
1577 raw = self.BytesIO(b"A" * 10)
1578 bufio = self.tp(raw, 100)
1579 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1580 self.assertEqual(bufio.truncate(), 2)
1581 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1582 self.assertEqual(bufio.truncate(), 4)
1583
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584 def test_misbehaved_io(self):
1585 BufferedReaderTest.test_misbehaved_io(self)
1586 BufferedWriterTest.test_misbehaved_io(self)
1587
Antoine Pitroue05565e2011-08-20 14:39:23 +02001588 def test_interleaved_read_write(self):
1589 # Test for issue #12213
1590 with self.BytesIO(b'abcdefgh') as raw:
1591 with self.tp(raw, 100) as f:
1592 f.write(b"1")
1593 self.assertEqual(f.read(1), b'b')
1594 f.write(b'2')
1595 self.assertEqual(f.read1(1), b'd')
1596 f.write(b'3')
1597 buf = bytearray(1)
1598 f.readinto(buf)
1599 self.assertEqual(buf, b'f')
1600 f.write(b'4')
1601 self.assertEqual(f.peek(1), b'h')
1602 f.flush()
1603 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1604
1605 with self.BytesIO(b'abc') as raw:
1606 with self.tp(raw, 100) as f:
1607 self.assertEqual(f.read(1), b'a')
1608 f.write(b"2")
1609 self.assertEqual(f.read(1), b'c')
1610 f.flush()
1611 self.assertEqual(raw.getvalue(), b'a2c')
1612
1613 def test_interleaved_readline_write(self):
1614 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1615 with self.tp(raw) as f:
1616 f.write(b'1')
1617 self.assertEqual(f.readline(), b'b\n')
1618 f.write(b'2')
1619 self.assertEqual(f.readline(), b'def\n')
1620 f.write(b'3')
1621 self.assertEqual(f.readline(), b'\n')
1622 f.flush()
1623 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1624
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001625 # You can't construct a BufferedRandom over a non-seekable stream.
1626 test_unseekable = None
1627
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001628class CBufferedRandomTest(BufferedRandomTest):
1629 tp = io.BufferedRandom
1630
1631 def test_constructor(self):
1632 BufferedRandomTest.test_constructor(self)
1633 # The allocation can succeed on 32-bit builds, e.g. with more
1634 # than 2GB RAM and a 64-bit kernel.
1635 if sys.maxsize > 0x7FFFFFFF:
1636 rawio = self.MockRawIO()
1637 bufio = self.tp(rawio)
1638 self.assertRaises((OverflowError, MemoryError, ValueError),
1639 bufio.__init__, rawio, sys.maxsize)
1640
1641 def test_garbage_collection(self):
1642 CBufferedReaderTest.test_garbage_collection(self)
1643 CBufferedWriterTest.test_garbage_collection(self)
1644
1645class PyBufferedRandomTest(BufferedRandomTest):
1646 tp = pyio.BufferedRandom
1647
1648
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001649# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1650# properties:
1651# - A single output character can correspond to many bytes of input.
1652# - The number of input bytes to complete the character can be
1653# undetermined until the last input byte is received.
1654# - The number of input bytes can vary depending on previous input.
1655# - A single input byte can correspond to many characters of output.
1656# - The number of output characters can be undetermined until the
1657# last input byte is received.
1658# - The number of output characters can vary depending on previous input.
1659
1660class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1661 """
1662 For testing seek/tell behavior with a stateful, buffering decoder.
1663
1664 Input is a sequence of words. Words may be fixed-length (length set
1665 by input) or variable-length (period-terminated). In variable-length
1666 mode, extra periods are ignored. Possible words are:
1667 - 'i' followed by a number sets the input length, I (maximum 99).
1668 When I is set to 0, words are space-terminated.
1669 - 'o' followed by a number sets the output length, O (maximum 99).
1670 - Any other word is converted into a word followed by a period on
1671 the output. The output word consists of the input word truncated
1672 or padded out with hyphens to make its length equal to O. If O
1673 is 0, the word is output verbatim without truncating or padding.
1674 I and O are initially set to 1. When I changes, any buffered input is
1675 re-scanned according to the new I. EOF also terminates the last word.
1676 """
1677
1678 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001679 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001680 self.reset()
1681
1682 def __repr__(self):
1683 return '<SID %x>' % id(self)
1684
1685 def reset(self):
1686 self.i = 1
1687 self.o = 1
1688 self.buffer = bytearray()
1689
1690 def getstate(self):
1691 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1692 return bytes(self.buffer), i*100 + o
1693
1694 def setstate(self, state):
1695 buffer, io = state
1696 self.buffer = bytearray(buffer)
1697 i, o = divmod(io, 100)
1698 self.i, self.o = i ^ 1, o ^ 1
1699
1700 def decode(self, input, final=False):
1701 output = ''
1702 for b in input:
1703 if self.i == 0: # variable-length, terminated with period
1704 if b == ord('.'):
1705 if self.buffer:
1706 output += self.process_word()
1707 else:
1708 self.buffer.append(b)
1709 else: # fixed-length, terminate after self.i bytes
1710 self.buffer.append(b)
1711 if len(self.buffer) == self.i:
1712 output += self.process_word()
1713 if final and self.buffer: # EOF terminates the last word
1714 output += self.process_word()
1715 return output
1716
1717 def process_word(self):
1718 output = ''
1719 if self.buffer[0] == ord('i'):
1720 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1721 elif self.buffer[0] == ord('o'):
1722 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1723 else:
1724 output = self.buffer.decode('ascii')
1725 if len(output) < self.o:
1726 output += '-'*self.o # pad out with hyphens
1727 if self.o:
1728 output = output[:self.o] # truncate to output length
1729 output += '.'
1730 self.buffer = bytearray()
1731 return output
1732
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001733 codecEnabled = False
1734
1735 @classmethod
1736 def lookupTestDecoder(cls, name):
1737 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001738 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001739 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001740 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001741 incrementalencoder=None,
1742 streamreader=None, streamwriter=None,
1743 incrementaldecoder=cls)
1744
1745# Register the previous decoder for testing.
1746# Disabled by default, tests will enable it.
1747codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1748
1749
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001750class StatefulIncrementalDecoderTest(unittest.TestCase):
1751 """
1752 Make sure the StatefulIncrementalDecoder actually works.
1753 """
1754
1755 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001756 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001757 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001758 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001759 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001760 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001761 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001762 # I=0, O=6 (variable-length input, fixed-length output)
1763 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1764 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001765 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001766 # I=6, O=3 (fixed-length input > fixed-length output)
1767 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1768 # I=0, then 3; O=29, then 15 (with longer output)
1769 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1770 'a----------------------------.' +
1771 'b----------------------------.' +
1772 'cde--------------------------.' +
1773 'abcdefghijabcde.' +
1774 'a.b------------.' +
1775 '.c.------------.' +
1776 'd.e------------.' +
1777 'k--------------.' +
1778 'l--------------.' +
1779 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001780 ]
1781
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001782 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001783 # Try a few one-shot test cases.
1784 for input, eof, output in self.test_cases:
1785 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001786 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001787
1788 # Also test an unfinished decode, followed by forcing EOF.
1789 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001790 self.assertEqual(d.decode(b'oiabcd'), '')
1791 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001792
1793class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001794
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001795 def setUp(self):
1796 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1797 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001798 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001799
Guido van Rossumd0712812007-04-11 16:32:43 +00001800 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001801 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001802
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 def test_constructor(self):
1804 r = self.BytesIO(b"\xc3\xa9\n\n")
1805 b = self.BufferedReader(r, 1000)
1806 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001807 t.__init__(b, encoding="latin-1", newline="\r\n")
1808 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001809 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001810 t.__init__(b, encoding="utf-8", line_buffering=True)
1811 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001812 self.assertEqual(t.line_buffering, True)
1813 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001814 self.assertRaises(TypeError, t.__init__, b, newline=42)
1815 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1816
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001817 def test_detach(self):
1818 r = self.BytesIO()
1819 b = self.BufferedWriter(r)
1820 t = self.TextIOWrapper(b)
1821 self.assertIs(t.detach(), b)
1822
1823 t = self.TextIOWrapper(b, encoding="ascii")
1824 t.write("howdy")
1825 self.assertFalse(r.getvalue())
1826 t.detach()
1827 self.assertEqual(r.getvalue(), b"howdy")
1828 self.assertRaises(ValueError, t.detach)
1829
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001830 def test_repr(self):
1831 raw = self.BytesIO("hello".encode("utf-8"))
1832 b = self.BufferedReader(raw)
1833 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001834 modname = self.TextIOWrapper.__module__
1835 self.assertEqual(repr(t),
1836 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1837 raw.name = "dummy"
1838 self.assertEqual(repr(t),
1839 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001840 t.mode = "r"
1841 self.assertEqual(repr(t),
1842 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001843 raw.name = b"dummy"
1844 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001845 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001847 def test_line_buffering(self):
1848 r = self.BytesIO()
1849 b = self.BufferedWriter(r, 1000)
1850 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001851 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001852 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001853 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001854 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001855 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001856 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 def test_encoding(self):
1859 # Check the encoding attribute is always set, and valid
1860 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00001861 t = self.TextIOWrapper(b, encoding="utf-8")
1862 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001864 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 codecs.lookup(t.encoding)
1866
1867 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001868 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001869 b = self.BytesIO(b"abc\n\xff\n")
1870 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001871 self.assertRaises(UnicodeError, t.read)
1872 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 b = self.BytesIO(b"abc\n\xff\n")
1874 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001875 self.assertRaises(UnicodeError, t.read)
1876 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 b = self.BytesIO(b"abc\n\xff\n")
1878 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001879 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001880 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001881 b = self.BytesIO(b"abc\n\xff\n")
1882 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001883 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001886 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001887 b = self.BytesIO()
1888 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001889 self.assertRaises(UnicodeError, t.write, "\xff")
1890 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001891 b = self.BytesIO()
1892 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001893 self.assertRaises(UnicodeError, t.write, "\xff")
1894 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 b = self.BytesIO()
1896 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001897 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001898 t.write("abc\xffdef\n")
1899 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001900 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001901 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 b = self.BytesIO()
1903 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001904 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001905 t.write("abc\xffdef\n")
1906 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001907 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001910 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1911
1912 tests = [
1913 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001914 [ '', input_lines ],
1915 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1916 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1917 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001918 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001919 encodings = (
1920 'utf-8', 'latin-1',
1921 'utf-16', 'utf-16-le', 'utf-16-be',
1922 'utf-32', 'utf-32-le', 'utf-32-be',
1923 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001924
Guido van Rossum8358db22007-08-18 21:39:55 +00001925 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001926 # character in TextIOWrapper._pending_line.
1927 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001928 # XXX: str.encode() should return bytes
1929 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001930 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001931 for bufsize in range(1, 10):
1932 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1934 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001935 encoding=encoding)
1936 if do_reads:
1937 got_lines = []
1938 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001939 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001940 if c2 == '':
1941 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001942 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001943 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001944 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001945 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001946
1947 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001948 self.assertEqual(got_line, exp_line)
1949 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001950
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 def test_newlines_input(self):
1952 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001953 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1954 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03001955 (None, normalized.decode("ascii").splitlines(keepends=True)),
1956 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001957 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1958 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1959 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001960 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 buf = self.BytesIO(testdata)
1962 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001964 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001965 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001967 def test_newlines_output(self):
1968 testdict = {
1969 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1970 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1971 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1972 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1973 }
1974 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1975 for newline, expected in tests:
1976 buf = self.BytesIO()
1977 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1978 txt.write("AAA\nB")
1979 txt.write("BB\nCCC\n")
1980 txt.write("X\rY\r\nZ")
1981 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001982 self.assertEqual(buf.closed, False)
1983 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001984
1985 def test_destructor(self):
1986 l = []
1987 base = self.BytesIO
1988 class MyBytesIO(base):
1989 def close(self):
1990 l.append(self.getvalue())
1991 base.close(self)
1992 b = MyBytesIO()
1993 t = self.TextIOWrapper(b, encoding="ascii")
1994 t.write("abc")
1995 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001996 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001997 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998
1999 def test_override_destructor(self):
2000 record = []
2001 class MyTextIO(self.TextIOWrapper):
2002 def __del__(self):
2003 record.append(1)
2004 try:
2005 f = super().__del__
2006 except AttributeError:
2007 pass
2008 else:
2009 f()
2010 def close(self):
2011 record.append(2)
2012 super().close()
2013 def flush(self):
2014 record.append(3)
2015 super().flush()
2016 b = self.BytesIO()
2017 t = MyTextIO(b, encoding="ascii")
2018 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002019 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002020 self.assertEqual(record, [1, 2, 3])
2021
2022 def test_error_through_destructor(self):
2023 # Test that the exception state is not modified by a destructor,
2024 # even if close() fails.
2025 rawio = self.CloseFailureIO()
2026 def f():
2027 self.TextIOWrapper(rawio).xyzzy
2028 with support.captured_output("stderr") as s:
2029 self.assertRaises(AttributeError, f)
2030 s = s.getvalue().strip()
2031 if s:
2032 # The destructor *may* have printed an unraisable error, check it
2033 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002034 self.assertTrue(s.startswith("Exception IOError: "), s)
2035 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002036
Guido van Rossum9b76da62007-04-11 01:09:03 +00002037 # Systematic tests of the text I/O API
2038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002039 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002040 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002041 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002042 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002043 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002044 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002045 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002046 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002047 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002048 self.assertEqual(f.tell(), 0)
2049 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002050 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002051 self.assertEqual(f.seek(0), 0)
2052 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002053 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002054 self.assertEqual(f.read(2), "ab")
2055 self.assertEqual(f.read(1), "c")
2056 self.assertEqual(f.read(1), "")
2057 self.assertEqual(f.read(), "")
2058 self.assertEqual(f.tell(), cookie)
2059 self.assertEqual(f.seek(0), 0)
2060 self.assertEqual(f.seek(0, 2), cookie)
2061 self.assertEqual(f.write("def"), 3)
2062 self.assertEqual(f.seek(cookie), cookie)
2063 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002064 if enc.startswith("utf"):
2065 self.multi_line_test(f, enc)
2066 f.close()
2067
2068 def multi_line_test(self, f, enc):
2069 f.seek(0)
2070 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002071 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002072 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002073 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002074 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002075 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002076 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002077 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002078 wlines.append((f.tell(), line))
2079 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002080 f.seek(0)
2081 rlines = []
2082 while True:
2083 pos = f.tell()
2084 line = f.readline()
2085 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002086 break
2087 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002088 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002090 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002091 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002092 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002093 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002094 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002095 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002096 p2 = f.tell()
2097 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002098 self.assertEqual(f.tell(), p0)
2099 self.assertEqual(f.readline(), "\xff\n")
2100 self.assertEqual(f.tell(), p1)
2101 self.assertEqual(f.readline(), "\xff\n")
2102 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002103 f.seek(0)
2104 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002105 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002106 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002107 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002108 f.close()
2109
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002110 def test_seeking(self):
2111 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002112 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002113 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002114 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002115 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002116 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002117 suffix = bytes(u_suffix.encode("utf-8"))
2118 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002119 with self.open(support.TESTFN, "wb") as f:
2120 f.write(line*2)
2121 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2122 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(s, str(prefix, "ascii"))
2124 self.assertEqual(f.tell(), prefix_size)
2125 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002127 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002128 # Regression test for a specific bug
2129 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002130 with self.open(support.TESTFN, "wb") as f:
2131 f.write(data)
2132 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2133 f._CHUNK_SIZE # Just test that it exists
2134 f._CHUNK_SIZE = 2
2135 f.readline()
2136 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138 def test_seek_and_tell(self):
2139 #Test seek/tell using the StatefulIncrementalDecoder.
2140 # Make test faster by doing smaller seeks
2141 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002142
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002143 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002144 """Tell/seek to various points within a data stream and ensure
2145 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002147 f.write(data)
2148 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 f = self.open(support.TESTFN, encoding='test_decoder')
2150 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002151 decoded = f.read()
2152 f.close()
2153
Neal Norwitze2b07052008-03-18 19:52:05 +00002154 for i in range(min_pos, len(decoded) + 1): # seek positions
2155 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002157 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002158 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002159 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002160 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002162 f.close()
2163
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002164 # Enable the test decoder.
2165 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002166
2167 # Run the tests.
2168 try:
2169 # Try each test case.
2170 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002171 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002172
2173 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002174 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2175 offset = CHUNK_SIZE - len(input)//2
2176 prefix = b'.'*offset
2177 # Don't bother seeking into the prefix (takes too long).
2178 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002179 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002180
2181 # Ensure our test decoder won't interfere with subsequent tests.
2182 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002183 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002186 data = "1234567890"
2187 tests = ("utf-16",
2188 "utf-16-le",
2189 "utf-16-be",
2190 "utf-32",
2191 "utf-32-le",
2192 "utf-32-be")
2193 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 buf = self.BytesIO()
2195 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002196 # Check if the BOM is written only once (see issue1753).
2197 f.write(data)
2198 f.write(data)
2199 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002200 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002201 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(f.read(), data * 2)
2203 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002204
Benjamin Petersona1b49012009-03-31 23:11:32 +00002205 def test_unreadable(self):
2206 class UnReadable(self.BytesIO):
2207 def readable(self):
2208 return False
2209 txt = self.TextIOWrapper(UnReadable())
2210 self.assertRaises(IOError, txt.read)
2211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002212 def test_read_one_by_one(self):
2213 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002214 reads = ""
2215 while True:
2216 c = txt.read(1)
2217 if not c:
2218 break
2219 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002221
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002222 def test_readlines(self):
2223 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2224 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2225 txt.seek(0)
2226 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2227 txt.seek(0)
2228 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2229
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002230 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002232 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002234 reads = ""
2235 while True:
2236 c = txt.read(128)
2237 if not c:
2238 break
2239 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002240 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002241
2242 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002244
2245 # read one char at a time
2246 reads = ""
2247 while True:
2248 c = txt.read(1)
2249 if not c:
2250 break
2251 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002252 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002253
2254 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002256 txt._CHUNK_SIZE = 4
2257
2258 reads = ""
2259 while True:
2260 c = txt.read(4)
2261 if not c:
2262 break
2263 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002264 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002265
2266 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002267 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002268 txt._CHUNK_SIZE = 4
2269
2270 reads = txt.read(4)
2271 reads += txt.read(4)
2272 reads += txt.readline()
2273 reads += txt.readline()
2274 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002276
2277 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002278 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002279 txt._CHUNK_SIZE = 4
2280
2281 reads = txt.read(4)
2282 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002283 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002284
2285 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002287 txt._CHUNK_SIZE = 4
2288
2289 reads = txt.read(4)
2290 pos = txt.tell()
2291 txt.seek(0)
2292 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002293 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002294
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002295 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002296 buffer = self.BytesIO(self.testdata)
2297 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002298
2299 self.assertEqual(buffer.seekable(), txt.seekable())
2300
Antoine Pitroue4501852009-05-14 18:55:55 +00002301 def test_append_bom(self):
2302 # The BOM is not written again when appending to a non-empty file
2303 filename = support.TESTFN
2304 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2305 with self.open(filename, 'w', encoding=charset) as f:
2306 f.write('aaa')
2307 pos = f.tell()
2308 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002309 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002310
2311 with self.open(filename, 'a', encoding=charset) as f:
2312 f.write('xxx')
2313 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002314 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002315
2316 def test_seek_bom(self):
2317 # Same test, but when seeking manually
2318 filename = support.TESTFN
2319 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2320 with self.open(filename, 'w', encoding=charset) as f:
2321 f.write('aaa')
2322 pos = f.tell()
2323 with self.open(filename, 'r+', encoding=charset) as f:
2324 f.seek(pos)
2325 f.write('zzz')
2326 f.seek(0)
2327 f.write('bbb')
2328 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002329 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002330
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002331 def test_errors_property(self):
2332 with self.open(support.TESTFN, "w") as f:
2333 self.assertEqual(f.errors, "strict")
2334 with self.open(support.TESTFN, "w", errors="replace") as f:
2335 self.assertEqual(f.errors, "replace")
2336
Brett Cannon31f59292011-02-21 19:29:56 +00002337 @support.no_tracing
Victor Stinner45df8202010-04-28 22:31:17 +00002338 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002339 def test_threads_write(self):
2340 # Issue6750: concurrent writes could duplicate data
2341 event = threading.Event()
2342 with self.open(support.TESTFN, "w", buffering=1) as f:
2343 def run(n):
2344 text = "Thread%03d\n" % n
2345 event.wait()
2346 f.write(text)
2347 threads = [threading.Thread(target=lambda n=x: run(n))
2348 for x in range(20)]
2349 for t in threads:
2350 t.start()
2351 time.sleep(0.02)
2352 event.set()
2353 for t in threads:
2354 t.join()
2355 with self.open(support.TESTFN) as f:
2356 content = f.read()
2357 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002358 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002359
Antoine Pitrou6be88762010-05-03 16:48:20 +00002360 def test_flush_error_on_close(self):
2361 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2362 def bad_flush():
2363 raise IOError()
2364 txt.flush = bad_flush
2365 self.assertRaises(IOError, txt.close) # exception not swallowed
2366
2367 def test_multi_close(self):
2368 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2369 txt.close()
2370 txt.close()
2371 txt.close()
2372 self.assertRaises(ValueError, txt.flush)
2373
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002374 def test_unseekable(self):
2375 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2376 self.assertRaises(self.UnsupportedOperation, txt.tell)
2377 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2378
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002379 def test_readonly_attributes(self):
2380 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2381 buf = self.BytesIO(self.testdata)
2382 with self.assertRaises(AttributeError):
2383 txt.buffer = buf
2384
Antoine Pitroue96ec682011-07-23 21:46:35 +02002385 def test_rawio(self):
2386 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2387 # that subprocess.Popen() can have the required unbuffered
2388 # semantics with universal_newlines=True.
2389 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2390 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2391 # Reads
2392 self.assertEqual(txt.read(4), 'abcd')
2393 self.assertEqual(txt.readline(), 'efghi\n')
2394 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2395
2396 def test_rawio_write_through(self):
2397 # Issue #12591: with write_through=True, writes don't need a flush
2398 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2399 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2400 write_through=True)
2401 txt.write('1')
2402 txt.write('23\n4')
2403 txt.write('5')
2404 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406class CTextIOWrapperTest(TextIOWrapperTest):
2407
2408 def test_initialization(self):
2409 r = self.BytesIO(b"\xc3\xa9\n\n")
2410 b = self.BufferedReader(r, 1000)
2411 t = self.TextIOWrapper(b)
2412 self.assertRaises(TypeError, t.__init__, b, newline=42)
2413 self.assertRaises(ValueError, t.read)
2414 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2415 self.assertRaises(ValueError, t.read)
2416
2417 def test_garbage_collection(self):
2418 # C TextIOWrapper objects are collected, and collecting them flushes
2419 # all data to disk.
2420 # The Python version has __del__, so it ends in gc.garbage instead.
2421 rawio = io.FileIO(support.TESTFN, "wb")
2422 b = self.BufferedWriter(rawio)
2423 t = self.TextIOWrapper(b, encoding="ascii")
2424 t.write("456def")
2425 t.x = t
2426 wr = weakref.ref(t)
2427 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002428 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002429 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002430 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002431 self.assertEqual(f.read(), b"456def")
2432
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002433 def test_rwpair_cleared_before_textio(self):
2434 # Issue 13070: TextIOWrapper's finalization would crash when called
2435 # after the reference to the underlying BufferedRWPair's writer got
2436 # cleared by the GC.
2437 for i in range(1000):
2438 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2439 t1 = self.TextIOWrapper(b1, encoding="ascii")
2440 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2441 t2 = self.TextIOWrapper(b2, encoding="ascii")
2442 # circular references
2443 t1.buddy = t2
2444 t2.buddy = t1
2445 support.gc_collect()
2446
2447
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448class PyTextIOWrapperTest(TextIOWrapperTest):
2449 pass
2450
2451
2452class IncrementalNewlineDecoderTest(unittest.TestCase):
2453
2454 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002455 # UTF-8 specific tests for a newline decoder
2456 def _check_decode(b, s, **kwargs):
2457 # We exercise getstate() / setstate() as well as decode()
2458 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002459 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002460 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002462
Antoine Pitrou180a3362008-12-14 16:36:46 +00002463 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002464
Antoine Pitrou180a3362008-12-14 16:36:46 +00002465 _check_decode(b'\xe8', "")
2466 _check_decode(b'\xa2', "")
2467 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002468
Antoine Pitrou180a3362008-12-14 16:36:46 +00002469 _check_decode(b'\xe8', "")
2470 _check_decode(b'\xa2', "")
2471 _check_decode(b'\x88', "\u8888")
2472
2473 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002474 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2475
Antoine Pitrou180a3362008-12-14 16:36:46 +00002476 decoder.reset()
2477 _check_decode(b'\n', "\n")
2478 _check_decode(b'\r', "")
2479 _check_decode(b'', "\n", final=True)
2480 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002481
Antoine Pitrou180a3362008-12-14 16:36:46 +00002482 _check_decode(b'\r', "")
2483 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002484
Antoine Pitrou180a3362008-12-14 16:36:46 +00002485 _check_decode(b'\r\r\n', "\n\n")
2486 _check_decode(b'\r', "")
2487 _check_decode(b'\r', "\n")
2488 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002489
Antoine Pitrou180a3362008-12-14 16:36:46 +00002490 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2491 _check_decode(b'\xe8\xa2\x88', "\u8888")
2492 _check_decode(b'\n', "\n")
2493 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2494 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002495
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002496 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002497 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002498 if encoding is not None:
2499 encoder = codecs.getincrementalencoder(encoding)()
2500 def _decode_bytewise(s):
2501 # Decode one byte at a time
2502 for b in encoder.encode(s):
2503 result.append(decoder.decode(bytes([b])))
2504 else:
2505 encoder = None
2506 def _decode_bytewise(s):
2507 # Decode one char at a time
2508 for c in s:
2509 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002510 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002511 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002512 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002513 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002514 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002515 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002516 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002517 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002518 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002519 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002520 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002521 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 input = "abc"
2523 if encoder is not None:
2524 encoder.reset()
2525 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(decoder.decode(input), "abc")
2527 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002528
2529 def test_newline_decoder(self):
2530 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002531 # None meaning the IncrementalNewlineDecoder takes unicode input
2532 # rather than bytes input
2533 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002534 'utf-16', 'utf-16-le', 'utf-16-be',
2535 'utf-32', 'utf-32-le', 'utf-32-be',
2536 )
2537 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538 decoder = enc and codecs.getincrementaldecoder(enc)()
2539 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2540 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002541 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2543 self.check_newline_decoding_utf8(decoder)
2544
Antoine Pitrou66913e22009-03-06 23:40:56 +00002545 def test_newline_bytes(self):
2546 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2547 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002548 self.assertEqual(dec.newlines, None)
2549 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2550 self.assertEqual(dec.newlines, None)
2551 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2552 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002553 dec = self.IncrementalNewlineDecoder(None, translate=False)
2554 _check(dec)
2555 dec = self.IncrementalNewlineDecoder(None, translate=True)
2556 _check(dec)
2557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002558class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2559 pass
2560
2561class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2562 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002563
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002564
Guido van Rossum01a27522007-03-07 01:00:12 +00002565# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002566
Guido van Rossum5abbf752007-08-27 17:39:33 +00002567class MiscIOTest(unittest.TestCase):
2568
Barry Warsaw40e82462008-11-20 20:14:50 +00002569 def tearDown(self):
2570 support.unlink(support.TESTFN)
2571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002572 def test___all__(self):
2573 for name in self.io.__all__:
2574 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002575 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002576 if name == "open":
2577 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002578 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002579 self.assertTrue(issubclass(obj, Exception), name)
2580 elif not name.startswith("SEEK_"):
2581 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002582
Barry Warsaw40e82462008-11-20 20:14:50 +00002583 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002584 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002585 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002586 f.close()
2587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002588 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002589 self.assertEqual(f.name, support.TESTFN)
2590 self.assertEqual(f.buffer.name, support.TESTFN)
2591 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2592 self.assertEqual(f.mode, "U")
2593 self.assertEqual(f.buffer.mode, "rb")
2594 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002595 f.close()
2596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002597 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002598 self.assertEqual(f.mode, "w+")
2599 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2600 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002601
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002602 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002603 self.assertEqual(g.mode, "wb")
2604 self.assertEqual(g.raw.mode, "wb")
2605 self.assertEqual(g.name, f.fileno())
2606 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002607 f.close()
2608 g.close()
2609
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002610 def test_io_after_close(self):
2611 for kwargs in [
2612 {"mode": "w"},
2613 {"mode": "wb"},
2614 {"mode": "w", "buffering": 1},
2615 {"mode": "w", "buffering": 2},
2616 {"mode": "wb", "buffering": 0},
2617 {"mode": "r"},
2618 {"mode": "rb"},
2619 {"mode": "r", "buffering": 1},
2620 {"mode": "r", "buffering": 2},
2621 {"mode": "rb", "buffering": 0},
2622 {"mode": "w+"},
2623 {"mode": "w+b"},
2624 {"mode": "w+", "buffering": 1},
2625 {"mode": "w+", "buffering": 2},
2626 {"mode": "w+b", "buffering": 0},
2627 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002629 f.close()
2630 self.assertRaises(ValueError, f.flush)
2631 self.assertRaises(ValueError, f.fileno)
2632 self.assertRaises(ValueError, f.isatty)
2633 self.assertRaises(ValueError, f.__iter__)
2634 if hasattr(f, "peek"):
2635 self.assertRaises(ValueError, f.peek, 1)
2636 self.assertRaises(ValueError, f.read)
2637 if hasattr(f, "read1"):
2638 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002639 if hasattr(f, "readall"):
2640 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002641 if hasattr(f, "readinto"):
2642 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2643 self.assertRaises(ValueError, f.readline)
2644 self.assertRaises(ValueError, f.readlines)
2645 self.assertRaises(ValueError, f.seek, 0)
2646 self.assertRaises(ValueError, f.tell)
2647 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002648 self.assertRaises(ValueError, f.write,
2649 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002650 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002651 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002653 def test_blockingioerror(self):
2654 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002655 class C(str):
2656 pass
2657 c = C("")
2658 b = self.BlockingIOError(1, c)
2659 c.b = b
2660 b.c = c
2661 wr = weakref.ref(c)
2662 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002663 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002664 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002665
2666 def test_abcs(self):
2667 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002668 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2669 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2670 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2671 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002672
2673 def _check_abc_inheritance(self, abcmodule):
2674 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002675 self.assertIsInstance(f, abcmodule.IOBase)
2676 self.assertIsInstance(f, abcmodule.RawIOBase)
2677 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2678 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002679 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002680 self.assertIsInstance(f, abcmodule.IOBase)
2681 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2682 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2683 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002684 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002685 self.assertIsInstance(f, abcmodule.IOBase)
2686 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2687 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2688 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002689
2690 def test_abc_inheritance(self):
2691 # Test implementations inherit from their respective ABCs
2692 self._check_abc_inheritance(self)
2693
2694 def test_abc_inheritance_official(self):
2695 # Test implementations inherit from the official ABCs of the
2696 # baseline "io" module.
2697 self._check_abc_inheritance(io)
2698
Antoine Pitroue033e062010-10-29 10:38:18 +00002699 def _check_warn_on_dealloc(self, *args, **kwargs):
2700 f = open(*args, **kwargs)
2701 r = repr(f)
2702 with self.assertWarns(ResourceWarning) as cm:
2703 f = None
2704 support.gc_collect()
2705 self.assertIn(r, str(cm.warning.args[0]))
2706
2707 def test_warn_on_dealloc(self):
2708 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2709 self._check_warn_on_dealloc(support.TESTFN, "wb")
2710 self._check_warn_on_dealloc(support.TESTFN, "w")
2711
2712 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2713 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002714 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002715 for fd in fds:
2716 try:
2717 os.close(fd)
2718 except EnvironmentError as e:
2719 if e.errno != errno.EBADF:
2720 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002721 self.addCleanup(cleanup_fds)
2722 r, w = os.pipe()
2723 fds += r, w
2724 self._check_warn_on_dealloc(r, *args, **kwargs)
2725 # When using closefd=False, there's no warning
2726 r, w = os.pipe()
2727 fds += r, w
2728 with warnings.catch_warnings(record=True) as recorded:
2729 open(r, *args, closefd=False, **kwargs)
2730 support.gc_collect()
2731 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002732
2733 def test_warn_on_dealloc_fd(self):
2734 self._check_warn_on_dealloc_fd("rb", buffering=0)
2735 self._check_warn_on_dealloc_fd("rb")
2736 self._check_warn_on_dealloc_fd("r")
2737
2738
Antoine Pitrou243757e2010-11-05 21:15:39 +00002739 def test_pickling(self):
2740 # Pickling file objects is forbidden
2741 for kwargs in [
2742 {"mode": "w"},
2743 {"mode": "wb"},
2744 {"mode": "wb", "buffering": 0},
2745 {"mode": "r"},
2746 {"mode": "rb"},
2747 {"mode": "rb", "buffering": 0},
2748 {"mode": "w+"},
2749 {"mode": "w+b"},
2750 {"mode": "w+b", "buffering": 0},
2751 ]:
2752 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2753 with self.open(support.TESTFN, **kwargs) as f:
2754 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756class CMiscIOTest(MiscIOTest):
2757 io = io
2758
2759class PyMiscIOTest(MiscIOTest):
2760 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002761
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002762
2763@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2764class SignalsTest(unittest.TestCase):
2765
2766 def setUp(self):
2767 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2768
2769 def tearDown(self):
2770 signal.signal(signal.SIGALRM, self.oldalrm)
2771
2772 def alarm_interrupt(self, sig, frame):
2773 1/0
2774
2775 @unittest.skipUnless(threading, 'Threading required for this test.')
2776 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2777 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00002778 invokes the signal handler, and bubbles up the exception raised
2779 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002780 read_results = []
2781 def _read():
Victor Stinnera9293352011-04-30 15:21:58 +02002782 if hasattr(signal, 'pthread_sigmask'):
2783 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002784 s = os.read(r, 1)
2785 read_results.append(s)
2786 t = threading.Thread(target=_read)
2787 t.daemon = True
2788 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002789 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002790 try:
2791 wio = self.io.open(w, **fdopen_kwargs)
2792 t.start()
2793 signal.alarm(1)
2794 # Fill the pipe enough that the write will be blocking.
2795 # It will be interrupted by the timer armed above. Since the
2796 # other thread has read one byte, the low-level write will
2797 # return with a successful (partial) result rather than an EINTR.
2798 # The buffered IO layer must check for pending signal
2799 # handlers, which in this case will invoke alarm_interrupt().
2800 self.assertRaises(ZeroDivisionError,
Charles-François Natali2d517212011-05-29 16:36:44 +02002801 wio.write, item * (support.PIPE_MAX_SIZE // len(item)))
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002802 t.join()
2803 # We got one byte, get another one and check that it isn't a
2804 # repeat of the first one.
2805 read_results.append(os.read(r, 1))
2806 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2807 finally:
2808 os.close(w)
2809 os.close(r)
2810 # This is deliberate. If we didn't close the file descriptor
2811 # before closing wio, wio would try to flush its internal
2812 # buffer, and block again.
2813 try:
2814 wio.close()
2815 except IOError as e:
2816 if e.errno != errno.EBADF:
2817 raise
2818
2819 def test_interrupted_write_unbuffered(self):
2820 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2821
2822 def test_interrupted_write_buffered(self):
2823 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2824
2825 def test_interrupted_write_text(self):
2826 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2827
Brett Cannon31f59292011-02-21 19:29:56 +00002828 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002829 def check_reentrant_write(self, data, **fdopen_kwargs):
2830 def on_alarm(*args):
2831 # Will be called reentrantly from the same thread
2832 wio.write(data)
2833 1/0
2834 signal.signal(signal.SIGALRM, on_alarm)
2835 r, w = os.pipe()
2836 wio = self.io.open(w, **fdopen_kwargs)
2837 try:
2838 signal.alarm(1)
2839 # Either the reentrant call to wio.write() fails with RuntimeError,
2840 # or the signal handler raises ZeroDivisionError.
2841 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2842 while 1:
2843 for i in range(100):
2844 wio.write(data)
2845 wio.flush()
2846 # Make sure the buffer doesn't fill up and block further writes
2847 os.read(r, len(data) * 100)
2848 exc = cm.exception
2849 if isinstance(exc, RuntimeError):
2850 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2851 finally:
2852 wio.close()
2853 os.close(r)
2854
2855 def test_reentrant_write_buffered(self):
2856 self.check_reentrant_write(b"xy", mode="wb")
2857
2858 def test_reentrant_write_text(self):
2859 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2860
Antoine Pitrou707ce822011-02-25 21:24:11 +00002861 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2862 """Check that a buffered read, when it gets interrupted (either
2863 returning a partial result or EINTR), properly invokes the signal
2864 handler and retries if the latter returned successfully."""
2865 r, w = os.pipe()
2866 fdopen_kwargs["closefd"] = False
2867 def alarm_handler(sig, frame):
2868 os.write(w, b"bar")
2869 signal.signal(signal.SIGALRM, alarm_handler)
2870 try:
2871 rio = self.io.open(r, **fdopen_kwargs)
2872 os.write(w, b"foo")
2873 signal.alarm(1)
2874 # Expected behaviour:
2875 # - first raw read() returns partial b"foo"
2876 # - second raw read() returns EINTR
2877 # - third raw read() returns b"bar"
2878 self.assertEqual(decode(rio.read(6)), "foobar")
2879 finally:
2880 rio.close()
2881 os.close(w)
2882 os.close(r)
2883
Antoine Pitrou20db5112011-08-19 20:32:34 +02002884 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002885 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2886 mode="rb")
2887
Antoine Pitrou20db5112011-08-19 20:32:34 +02002888 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002889 self.check_interrupted_read_retry(lambda x: x,
2890 mode="r")
2891
2892 @unittest.skipUnless(threading, 'Threading required for this test.')
2893 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2894 """Check that a buffered write, when it gets interrupted (either
2895 returning a partial result or EINTR), properly invokes the signal
2896 handler and retries if the latter returned successfully."""
2897 select = support.import_module("select")
2898 # A quantity that exceeds the buffer size of an anonymous pipe's
2899 # write end.
2900 N = 1024 * 1024
2901 r, w = os.pipe()
2902 fdopen_kwargs["closefd"] = False
2903 # We need a separate thread to read from the pipe and allow the
2904 # write() to finish. This thread is started after the SIGALRM is
2905 # received (forcing a first EINTR in write()).
2906 read_results = []
2907 write_finished = False
2908 def _read():
2909 while not write_finished:
2910 while r in select.select([r], [], [], 1.0)[0]:
2911 s = os.read(r, 1024)
2912 read_results.append(s)
2913 t = threading.Thread(target=_read)
2914 t.daemon = True
2915 def alarm1(sig, frame):
2916 signal.signal(signal.SIGALRM, alarm2)
2917 signal.alarm(1)
2918 def alarm2(sig, frame):
2919 t.start()
2920 signal.signal(signal.SIGALRM, alarm1)
2921 try:
2922 wio = self.io.open(w, **fdopen_kwargs)
2923 signal.alarm(1)
2924 # Expected behaviour:
2925 # - first raw write() is partial (because of the limited pipe buffer
2926 # and the first alarm)
2927 # - second raw write() returns EINTR (because of the second alarm)
2928 # - subsequent write()s are successful (either partial or complete)
2929 self.assertEqual(N, wio.write(item * N))
2930 wio.flush()
2931 write_finished = True
2932 t.join()
2933 self.assertEqual(N, sum(len(x) for x in read_results))
2934 finally:
2935 write_finished = True
2936 os.close(w)
2937 os.close(r)
2938 # This is deliberate. If we didn't close the file descriptor
2939 # before closing wio, wio would try to flush its internal
2940 # buffer, and could block (in case of failure).
2941 try:
2942 wio.close()
2943 except IOError as e:
2944 if e.errno != errno.EBADF:
2945 raise
2946
Antoine Pitrou20db5112011-08-19 20:32:34 +02002947 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002948 self.check_interrupted_write_retry(b"x", mode="wb")
2949
Antoine Pitrou20db5112011-08-19 20:32:34 +02002950 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00002951 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2952
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002953
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002954class CSignalsTest(SignalsTest):
2955 io = io
2956
2957class PySignalsTest(SignalsTest):
2958 io = pyio
2959
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002960 # Handling reentrancy issues would slow down _pyio even more, so the
2961 # tests are disabled.
2962 test_reentrant_write_buffered = None
2963 test_reentrant_write_text = None
2964
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002965
Guido van Rossum28524c72007-02-27 05:47:44 +00002966def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002967 tests = (CIOTest, PyIOTest,
2968 CBufferedReaderTest, PyBufferedReaderTest,
2969 CBufferedWriterTest, PyBufferedWriterTest,
2970 CBufferedRWPairTest, PyBufferedRWPairTest,
2971 CBufferedRandomTest, PyBufferedRandomTest,
2972 StatefulIncrementalDecoderTest,
2973 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2974 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002975 CMiscIOTest, PyMiscIOTest,
2976 CSignalsTest, PySignalsTest,
2977 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002978
2979 # Put the namespaces of the IO module we are testing and some useful mock
2980 # classes in the __dict__ of each test.
2981 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002982 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002983 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2984 c_io_ns = {name : getattr(io, name) for name in all_members}
2985 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2986 globs = globals()
2987 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2988 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2989 # Avoid turning open into a bound method.
2990 py_io_ns["open"] = pyio.OpenWrapper
2991 for test in tests:
2992 if test.__name__.startswith("C"):
2993 for name, obj in c_io_ns.items():
2994 setattr(test, name, obj)
2995 elif test.__name__.startswith("Py"):
2996 for name, obj in py_io_ns.items():
2997 setattr(test, name, obj)
2998
2999 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00003000
3001if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003002 test_main()