blob: eb2ac5fe99d82499e94cd118aa12cdf98ee01750 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400613 def test_types_have_dict(self):
614 test = (
615 self.IOBase(),
616 self.RawIOBase(),
617 self.TextIOBase(),
618 self.StringIO(),
619 self.BytesIO()
620 )
621 for obj in test:
622 self.assertTrue(hasattr(obj, "__dict__"))
623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200625
626 def test_IOBase_finalize(self):
627 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
628 # class which inherits IOBase and an object of this class are caught
629 # in a reference cycle and close() is already in the method cache.
630 class MyIO(self.IOBase):
631 def close(self):
632 pass
633
634 # create an instance to populate the method cache
635 MyIO()
636 obj = MyIO()
637 obj.obj = obj
638 wr = weakref.ref(obj)
639 del MyIO
640 del obj
641 support.gc_collect()
642 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000644class PyIOTest(IOTest):
645 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000646
Guido van Rossuma9e20242007-03-08 00:43:48 +0000647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 def test_fileno(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662
663 def test_no_fileno(self):
664 # XXX will we always have fileno() function? If so, kill
665 # this test. Else, write it.
666 pass
667
668 def test_invalid_args(self):
669 rawio = self.MockRawIO()
670 bufio = self.tp(rawio)
671 # Invalid whence
672 self.assertRaises(ValueError, bufio.seek, 0, -1)
673 self.assertRaises(ValueError, bufio.seek, 0, 3)
674
675 def test_override_destructor(self):
676 tp = self.tp
677 record = []
678 class MyBufferedIO(tp):
679 def __del__(self):
680 record.append(1)
681 try:
682 f = super().__del__
683 except AttributeError:
684 pass
685 else:
686 f()
687 def close(self):
688 record.append(2)
689 super().close()
690 def flush(self):
691 record.append(3)
692 super().flush()
693 rawio = self.MockRawIO()
694 bufio = MyBufferedIO(rawio)
695 writable = bufio.writable()
696 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 if writable:
699 self.assertEqual(record, [1, 2, 3])
700 else:
701 self.assertEqual(record, [1, 2])
702
703 def test_context_manager(self):
704 # Test usability as a context manager
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 def _with():
708 with bufio:
709 pass
710 _with()
711 # bufio should now be closed, and using it a second time should raise
712 # a ValueError.
713 self.assertRaises(ValueError, _with)
714
715 def test_error_through_destructor(self):
716 # Test that the exception state is not modified by a destructor,
717 # even if close() fails.
718 rawio = self.CloseFailureIO()
719 def f():
720 self.tp(rawio).xyzzy
721 with support.captured_output("stderr") as s:
722 self.assertRaises(AttributeError, f)
723 s = s.getvalue().strip()
724 if s:
725 # The destructor *may* have printed an unraisable error, check it
726 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000727 self.assertTrue(s.startswith("Exception IOError: "), s)
728 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000729
Antoine Pitrou716c4442009-05-23 19:04:03 +0000730 def test_repr(self):
731 raw = self.MockRawIO()
732 b = self.tp(raw)
733 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
734 self.assertEqual(repr(b), "<%s>" % clsname)
735 raw.name = "dummy"
736 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
737 raw.name = b"dummy"
738 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
739
Antoine Pitrou6be88762010-05-03 16:48:20 +0000740 def test_flush_error_on_close(self):
741 raw = self.MockRawIO()
742 def bad_flush():
743 raise IOError()
744 raw.flush = bad_flush
745 b = self.tp(raw)
746 self.assertRaises(IOError, b.close) # exception not swallowed
747
748 def test_multi_close(self):
749 raw = self.MockRawIO()
750 b = self.tp(raw)
751 b.close()
752 b.close()
753 b.close()
754 self.assertRaises(ValueError, b.flush)
755
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000756 def test_unseekable(self):
757 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
758 self.assertRaises(self.UnsupportedOperation, bufio.tell)
759 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
760
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises(AttributeError):
766 buf.raw = x
767
Guido van Rossum78892e42007-04-06 17:31:18 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
770 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000772 def test_constructor(self):
773 rawio = self.MockRawIO([b"abc"])
774 bufio = self.tp(rawio)
775 bufio.__init__(rawio)
776 bufio.__init__(rawio, buffer_size=1024)
777 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000778 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
780 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
781 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
782 rawio = self.MockRawIO([b"abc"])
783 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000784 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000787 for arg in (None, 7):
788 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
789 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000790 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 # Invalid args
792 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read1(self):
795 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000797 self.assertEqual(b"a", bufio.read(1))
798 self.assertEqual(b"b", bufio.read1(1))
799 self.assertEqual(rawio._reads, 1)
800 self.assertEqual(b"c", bufio.read1(100))
801 self.assertEqual(rawio._reads, 1)
802 self.assertEqual(b"d", bufio.read1(100))
803 self.assertEqual(rawio._reads, 2)
804 self.assertEqual(b"efg", bufio.read1(100))
805 self.assertEqual(rawio._reads, 3)
806 self.assertEqual(b"", bufio.read1(100))
807 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 # Invalid args
809 self.assertRaises(ValueError, bufio.read1, -1)
810
811 def test_readinto(self):
812 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
813 bufio = self.tp(rawio)
814 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000815 self.assertEqual(bufio.readinto(b), 2)
816 self.assertEqual(b, b"ab")
817 self.assertEqual(bufio.readinto(b), 2)
818 self.assertEqual(b, b"cd")
819 self.assertEqual(bufio.readinto(b), 2)
820 self.assertEqual(b, b"ef")
821 self.assertEqual(bufio.readinto(b), 1)
822 self.assertEqual(b, b"gf")
823 self.assertEqual(bufio.readinto(b), 0)
824 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000825
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000826 def test_readlines(self):
827 def bufio():
828 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
829 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
831 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
832 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000835 data = b"abcdefghi"
836 dlen = len(data)
837
838 tests = [
839 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
840 [ 100, [ 3, 3, 3], [ dlen ] ],
841 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
842 ]
843
844 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 rawio = self.MockFileIO(data)
846 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000847 pos = 0
848 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000849 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000850 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000852 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000855 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000856 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
857 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(b"abcd", bufio.read(6))
859 self.assertEqual(b"e", bufio.read(1))
860 self.assertEqual(b"fg", bufio.read())
861 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200862 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000863 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000864
Victor Stinnera80987f2011-05-25 22:47:16 +0200865 rawio = self.MockRawIO((b"a", None, None))
866 self.assertEqual(b"a", rawio.readall())
867 self.assertIsNone(rawio.readall())
868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_read_past_eof(self):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000872
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_read_all(self):
876 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
877 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000878
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000880
Victor Stinner45df8202010-04-28 22:31:17 +0000881 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000882 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000884 try:
885 # Write out many bytes with exactly the same number of 0's,
886 # 1's... 255's. This will help us check that concurrent reading
887 # doesn't duplicate or forget contents.
888 N = 1000
889 l = list(range(256)) * N
890 random.shuffle(l)
891 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000892 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000893 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000894 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000896 errors = []
897 results = []
898 def f():
899 try:
900 # Intra-buffer read then buffer-flushing read
901 for n in cycle([1, 19]):
902 s = bufio.read(n)
903 if not s:
904 break
905 # list.append() is atomic
906 results.append(s)
907 except Exception as e:
908 errors.append(e)
909 raise
910 threads = [threading.Thread(target=f) for x in range(20)]
911 for t in threads:
912 t.start()
913 time.sleep(0.02) # yield
914 for t in threads:
915 t.join()
916 self.assertFalse(errors,
917 "the following exceptions were caught: %r" % errors)
918 s = b''.join(results)
919 for i in range(256):
920 c = bytes(bytearray([i]))
921 self.assertEqual(s.count(c), N)
922 finally:
923 support.unlink(support.TESTFN)
924
Antoine Pitrou1e44fec2011-10-04 12:26:20 +0200925 def test_unseekable(self):
926 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
927 self.assertRaises(self.UnsupportedOperation, bufio.tell)
928 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
929 bufio.read(1)
930 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
931 self.assertRaises(self.UnsupportedOperation, bufio.tell)
932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000933 def test_misbehaved_io(self):
934 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
935 bufio = self.tp(rawio)
936 self.assertRaises(IOError, bufio.seek, 0)
937 self.assertRaises(IOError, bufio.tell)
938
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000939 def test_no_extraneous_read(self):
940 # Issue #9550; when the raw IO object has satisfied the read request,
941 # we should not issue any additional reads, otherwise it may block
942 # (e.g. socket).
943 bufsize = 16
944 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
945 rawio = self.MockRawIO([b"x" * n])
946 bufio = self.tp(rawio, bufsize)
947 self.assertEqual(bufio.read(n), b"x" * n)
948 # Simple case: one raw read is enough to satisfy the request.
949 self.assertEqual(rawio._extraneous_reads, 0,
950 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
951 # A more complex case where two raw reads are needed to satisfy
952 # the request.
953 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
954 bufio = self.tp(rawio, bufsize)
955 self.assertEqual(bufio.read(n), b"x" * n)
956 self.assertEqual(rawio._extraneous_reads, 0,
957 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
958
959
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000960class CBufferedReaderTest(BufferedReaderTest):
961 tp = io.BufferedReader
962
963 def test_constructor(self):
964 BufferedReaderTest.test_constructor(self)
965 # The allocation can succeed on 32-bit builds, e.g. with more
966 # than 2GB RAM and a 64-bit kernel.
967 if sys.maxsize > 0x7FFFFFFF:
968 rawio = self.MockRawIO()
969 bufio = self.tp(rawio)
970 self.assertRaises((OverflowError, MemoryError, ValueError),
971 bufio.__init__, rawio, sys.maxsize)
972
973 def test_initialization(self):
974 rawio = self.MockRawIO([b"abc"])
975 bufio = self.tp(rawio)
976 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
977 self.assertRaises(ValueError, bufio.read)
978 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
979 self.assertRaises(ValueError, bufio.read)
980 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
981 self.assertRaises(ValueError, bufio.read)
982
983 def test_misbehaved_io_read(self):
984 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
985 bufio = self.tp(rawio)
986 # _pyio.BufferedReader seems to implement reading different, so that
987 # checking this is not so easy.
988 self.assertRaises(IOError, bufio.read, 10)
989
990 def test_garbage_collection(self):
991 # C BufferedReader objects are collected.
992 # The Python version has __del__, so it ends into gc.garbage instead
993 rawio = self.FileIO(support.TESTFN, "w+b")
994 f = self.tp(rawio)
995 f.f = f
996 wr = weakref.ref(f)
997 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000998 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000999 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001000
1001class PyBufferedReaderTest(BufferedReaderTest):
1002 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001003
Guido van Rossuma9e20242007-03-08 00:43:48 +00001004
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001005class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1006 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008 def test_constructor(self):
1009 rawio = self.MockRawIO()
1010 bufio = self.tp(rawio)
1011 bufio.__init__(rawio)
1012 bufio.__init__(rawio, buffer_size=1024)
1013 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001014 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 bufio.flush()
1016 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1017 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1018 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1019 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001020 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001022 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001023
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001024 def test_detach_flush(self):
1025 raw = self.MockRawIO()
1026 buf = self.tp(raw)
1027 buf.write(b"howdy!")
1028 self.assertFalse(raw._write_stack)
1029 buf.detach()
1030 self.assertEqual(raw._write_stack, [b"howdy!"])
1031
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001033 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 writer = self.MockRawIO()
1035 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001036 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001037 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001038
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001039 def test_write_overflow(self):
1040 writer = self.MockRawIO()
1041 bufio = self.tp(writer, 8)
1042 contents = b"abcdefghijklmnop"
1043 for n in range(0, len(contents), 3):
1044 bufio.write(contents[n:n+3])
1045 flushed = b"".join(writer._write_stack)
1046 # At least (total - 8) bytes were implicitly flushed, perhaps more
1047 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001048 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001049
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001050 def check_writes(self, intermediate_func):
1051 # Lots of writes, test the flushed output is as expected.
1052 contents = bytes(range(256)) * 1000
1053 n = 0
1054 writer = self.MockRawIO()
1055 bufio = self.tp(writer, 13)
1056 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1057 def gen_sizes():
1058 for size in count(1):
1059 for i in range(15):
1060 yield size
1061 sizes = gen_sizes()
1062 while n < len(contents):
1063 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 intermediate_func(bufio)
1066 n += size
1067 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001068 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001069
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 def test_writes(self):
1071 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001072
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001073 def test_writes_and_flushes(self):
1074 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 def test_writes_and_seeks(self):
1077 def _seekabs(bufio):
1078 pos = bufio.tell()
1079 bufio.seek(pos + 1, 0)
1080 bufio.seek(pos - 1, 0)
1081 bufio.seek(pos, 0)
1082 self.check_writes(_seekabs)
1083 def _seekrel(bufio):
1084 pos = bufio.seek(0, 1)
1085 bufio.seek(+1, 1)
1086 bufio.seek(-1, 1)
1087 bufio.seek(pos, 0)
1088 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 def test_writes_and_truncates(self):
1091 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 def test_write_non_blocking(self):
1094 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001095 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001096
Ezio Melottib3aedd42010-11-20 19:04:17 +00001097 self.assertEqual(bufio.write(b"abcd"), 4)
1098 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 # 1 byte will be written, the rest will be buffered
1100 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001101 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1104 raw.block_on(b"0")
1105 try:
1106 bufio.write(b"opqrwxyz0123456789")
1107 except self.BlockingIOError as e:
1108 written = e.characters_written
1109 else:
1110 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001111 self.assertEqual(written, 16)
1112 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001113 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001114
Ezio Melottib3aedd42010-11-20 19:04:17 +00001115 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001116 s = raw.pop_written()
1117 # Previously buffered bytes were flushed
1118 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001119
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 def test_write_and_rewind(self):
1121 raw = io.BytesIO()
1122 bufio = self.tp(raw, 4)
1123 self.assertEqual(bufio.write(b"abcdef"), 6)
1124 self.assertEqual(bufio.tell(), 6)
1125 bufio.seek(0, 0)
1126 self.assertEqual(bufio.write(b"XY"), 2)
1127 bufio.seek(6, 0)
1128 self.assertEqual(raw.getvalue(), b"XYcdef")
1129 self.assertEqual(bufio.write(b"123456"), 6)
1130 bufio.flush()
1131 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001133 def test_flush(self):
1134 writer = self.MockRawIO()
1135 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001136 bufio.write(b"abc")
1137 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001140 def test_destructor(self):
1141 writer = self.MockRawIO()
1142 bufio = self.tp(writer, 8)
1143 bufio.write(b"abc")
1144 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001145 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001146 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001147
1148 def test_truncate(self):
1149 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001150 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001151 bufio = self.tp(raw, 8)
1152 bufio.write(b"abcdef")
1153 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001154 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001155 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001156 self.assertEqual(f.read(), b"abc")
1157
Victor Stinner45df8202010-04-28 22:31:17 +00001158 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001159 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001160 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001161 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 # Write out many bytes from many threads and test they were
1163 # all flushed.
1164 N = 1000
1165 contents = bytes(range(256)) * N
1166 sizes = cycle([1, 19])
1167 n = 0
1168 queue = deque()
1169 while n < len(contents):
1170 size = next(sizes)
1171 queue.append(contents[n:n+size])
1172 n += size
1173 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001174 # We use a real file object because it allows us to
1175 # exercise situations where the GIL is released before
1176 # writing the buffer to the raw streams. This is in addition
1177 # to concurrency issues due to switching threads in the middle
1178 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001179 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001180 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001181 errors = []
1182 def f():
1183 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 while True:
1185 try:
1186 s = queue.popleft()
1187 except IndexError:
1188 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001189 bufio.write(s)
1190 except Exception as e:
1191 errors.append(e)
1192 raise
1193 threads = [threading.Thread(target=f) for x in range(20)]
1194 for t in threads:
1195 t.start()
1196 time.sleep(0.02) # yield
1197 for t in threads:
1198 t.join()
1199 self.assertFalse(errors,
1200 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001202 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203 s = f.read()
1204 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001205 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001206 finally:
1207 support.unlink(support.TESTFN)
1208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001209 def test_misbehaved_io(self):
1210 rawio = self.MisbehavedRawIO()
1211 bufio = self.tp(rawio, 5)
1212 self.assertRaises(IOError, bufio.seek, 0)
1213 self.assertRaises(IOError, bufio.tell)
1214 self.assertRaises(IOError, bufio.write, b"abcdef")
1215
Benjamin Peterson59406a92009-03-26 17:10:29 +00001216 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001217 with support.check_warnings(("max_buffer_size is deprecated",
1218 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001219 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001220
1221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001222class CBufferedWriterTest(BufferedWriterTest):
1223 tp = io.BufferedWriter
1224
1225 def test_constructor(self):
1226 BufferedWriterTest.test_constructor(self)
1227 # The allocation can succeed on 32-bit builds, e.g. with more
1228 # than 2GB RAM and a 64-bit kernel.
1229 if sys.maxsize > 0x7FFFFFFF:
1230 rawio = self.MockRawIO()
1231 bufio = self.tp(rawio)
1232 self.assertRaises((OverflowError, MemoryError, ValueError),
1233 bufio.__init__, rawio, sys.maxsize)
1234
1235 def test_initialization(self):
1236 rawio = self.MockRawIO()
1237 bufio = self.tp(rawio)
1238 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1239 self.assertRaises(ValueError, bufio.write, b"def")
1240 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1241 self.assertRaises(ValueError, bufio.write, b"def")
1242 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1243 self.assertRaises(ValueError, bufio.write, b"def")
1244
1245 def test_garbage_collection(self):
1246 # C BufferedWriter objects are collected, and collecting them flushes
1247 # all data to disk.
1248 # The Python version has __del__, so it ends into gc.garbage instead
1249 rawio = self.FileIO(support.TESTFN, "w+b")
1250 f = self.tp(rawio)
1251 f.write(b"123xxx")
1252 f.x = f
1253 wr = weakref.ref(f)
1254 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001255 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001256 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001257 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258 self.assertEqual(f.read(), b"123xxx")
1259
1260
1261class PyBufferedWriterTest(BufferedWriterTest):
1262 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001263
Guido van Rossum01a27522007-03-07 01:00:12 +00001264class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001265
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001266 def test_constructor(self):
1267 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001268 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001269
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001270 def test_detach(self):
1271 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1272 self.assertRaises(self.UnsupportedOperation, pair.detach)
1273
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001274 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001275 with support.check_warnings(("max_buffer_size is deprecated",
1276 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001277 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001278
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001279 def test_constructor_with_not_readable(self):
1280 class NotReadable(MockRawIO):
1281 def readable(self):
1282 return False
1283
1284 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1285
1286 def test_constructor_with_not_writeable(self):
1287 class NotWriteable(MockRawIO):
1288 def writable(self):
1289 return False
1290
1291 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1292
1293 def test_read(self):
1294 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1295
1296 self.assertEqual(pair.read(3), b"abc")
1297 self.assertEqual(pair.read(1), b"d")
1298 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001299 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1300 self.assertEqual(pair.read(None), b"abc")
1301
1302 def test_readlines(self):
1303 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1304 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1305 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1306 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001307
1308 def test_read1(self):
1309 # .read1() is delegated to the underlying reader object, so this test
1310 # can be shallow.
1311 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1312
1313 self.assertEqual(pair.read1(3), b"abc")
1314
1315 def test_readinto(self):
1316 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1317
1318 data = bytearray(5)
1319 self.assertEqual(pair.readinto(data), 5)
1320 self.assertEqual(data, b"abcde")
1321
1322 def test_write(self):
1323 w = self.MockRawIO()
1324 pair = self.tp(self.MockRawIO(), w)
1325
1326 pair.write(b"abc")
1327 pair.flush()
1328 pair.write(b"def")
1329 pair.flush()
1330 self.assertEqual(w._write_stack, [b"abc", b"def"])
1331
1332 def test_peek(self):
1333 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1334
1335 self.assertTrue(pair.peek(3).startswith(b"abc"))
1336 self.assertEqual(pair.read(3), b"abc")
1337
1338 def test_readable(self):
1339 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1340 self.assertTrue(pair.readable())
1341
1342 def test_writeable(self):
1343 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1344 self.assertTrue(pair.writable())
1345
1346 def test_seekable(self):
1347 # BufferedRWPairs are never seekable, even if their readers and writers
1348 # are.
1349 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1350 self.assertFalse(pair.seekable())
1351
1352 # .flush() is delegated to the underlying writer object and has been
1353 # tested in the test_write method.
1354
1355 def test_close_and_closed(self):
1356 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1357 self.assertFalse(pair.closed)
1358 pair.close()
1359 self.assertTrue(pair.closed)
1360
1361 def test_isatty(self):
1362 class SelectableIsAtty(MockRawIO):
1363 def __init__(self, isatty):
1364 MockRawIO.__init__(self)
1365 self._isatty = isatty
1366
1367 def isatty(self):
1368 return self._isatty
1369
1370 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1371 self.assertFalse(pair.isatty())
1372
1373 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1374 self.assertTrue(pair.isatty())
1375
1376 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1377 self.assertTrue(pair.isatty())
1378
1379 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1380 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001381
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001382class CBufferedRWPairTest(BufferedRWPairTest):
1383 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385class PyBufferedRWPairTest(BufferedRWPairTest):
1386 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001387
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001388
1389class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1390 read_mode = "rb+"
1391 write_mode = "wb+"
1392
1393 def test_constructor(self):
1394 BufferedReaderTest.test_constructor(self)
1395 BufferedWriterTest.test_constructor(self)
1396
1397 def test_read_and_write(self):
1398 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001399 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001400
1401 self.assertEqual(b"as", rw.read(2))
1402 rw.write(b"ddd")
1403 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001404 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001406 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001407
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001408 def test_seek_and_tell(self):
1409 raw = self.BytesIO(b"asdfghjkl")
1410 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001411
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(b"as", rw.read(2))
1413 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001414 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001415 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001416
Antoine Pitroue05565e2011-08-20 14:39:23 +02001417 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001418 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001419 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001420 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001421 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001422 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001423 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(7, rw.tell())
1425 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001426 rw.flush()
1427 self.assertEqual(b"asdf123fl", raw.getvalue())
1428
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001429 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001430
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 def check_flush_and_read(self, read_func):
1432 raw = self.BytesIO(b"abcdefghi")
1433 bufio = self.tp(raw)
1434
Ezio Melottib3aedd42010-11-20 19:04:17 +00001435 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(b"ef", read_func(bufio, 2))
1438 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001440 self.assertEqual(6, bufio.tell())
1441 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001442 raw.seek(0, 0)
1443 raw.write(b"XYZ")
1444 # flush() resets the read buffer
1445 bufio.flush()
1446 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001447 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448
1449 def test_flush_and_read(self):
1450 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1451
1452 def test_flush_and_readinto(self):
1453 def _readinto(bufio, n=-1):
1454 b = bytearray(n if n >= 0 else 9999)
1455 n = bufio.readinto(b)
1456 return bytes(b[:n])
1457 self.check_flush_and_read(_readinto)
1458
1459 def test_flush_and_peek(self):
1460 def _peek(bufio, n=-1):
1461 # This relies on the fact that the buffer can contain the whole
1462 # raw stream, otherwise peek() can return less.
1463 b = bufio.peek(n)
1464 if n != -1:
1465 b = b[:n]
1466 bufio.seek(len(b), 1)
1467 return b
1468 self.check_flush_and_read(_peek)
1469
1470 def test_flush_and_write(self):
1471 raw = self.BytesIO(b"abcdefghi")
1472 bufio = self.tp(raw)
1473
1474 bufio.write(b"123")
1475 bufio.flush()
1476 bufio.write(b"45")
1477 bufio.flush()
1478 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001479 self.assertEqual(b"12345fghi", raw.getvalue())
1480 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001481
1482 def test_threads(self):
1483 BufferedReaderTest.test_threads(self)
1484 BufferedWriterTest.test_threads(self)
1485
1486 def test_writes_and_peek(self):
1487 def _peek(bufio):
1488 bufio.peek(1)
1489 self.check_writes(_peek)
1490 def _peek(bufio):
1491 pos = bufio.tell()
1492 bufio.seek(-1, 1)
1493 bufio.peek(1)
1494 bufio.seek(pos, 0)
1495 self.check_writes(_peek)
1496
1497 def test_writes_and_reads(self):
1498 def _read(bufio):
1499 bufio.seek(-1, 1)
1500 bufio.read(1)
1501 self.check_writes(_read)
1502
1503 def test_writes_and_read1s(self):
1504 def _read1(bufio):
1505 bufio.seek(-1, 1)
1506 bufio.read1(1)
1507 self.check_writes(_read1)
1508
1509 def test_writes_and_readintos(self):
1510 def _read(bufio):
1511 bufio.seek(-1, 1)
1512 bufio.readinto(bytearray(1))
1513 self.check_writes(_read)
1514
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001515 def test_write_after_readahead(self):
1516 # Issue #6629: writing after the buffer was filled by readahead should
1517 # first rewind the raw stream.
1518 for overwrite_size in [1, 5]:
1519 raw = self.BytesIO(b"A" * 10)
1520 bufio = self.tp(raw, 4)
1521 # Trigger readahead
1522 self.assertEqual(bufio.read(1), b"A")
1523 self.assertEqual(bufio.tell(), 1)
1524 # Overwriting should rewind the raw stream if it needs so
1525 bufio.write(b"B" * overwrite_size)
1526 self.assertEqual(bufio.tell(), overwrite_size + 1)
1527 # If the write size was smaller than the buffer size, flush() and
1528 # check that rewind happens.
1529 bufio.flush()
1530 self.assertEqual(bufio.tell(), overwrite_size + 1)
1531 s = raw.getvalue()
1532 self.assertEqual(s,
1533 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1534
Antoine Pitrou7c404892011-05-13 00:13:33 +02001535 def test_write_rewind_write(self):
1536 # Various combinations of reading / writing / seeking backwards / writing again
1537 def mutate(bufio, pos1, pos2):
1538 assert pos2 >= pos1
1539 # Fill the buffer
1540 bufio.seek(pos1)
1541 bufio.read(pos2 - pos1)
1542 bufio.write(b'\x02')
1543 # This writes earlier than the previous write, but still inside
1544 # the buffer.
1545 bufio.seek(pos1)
1546 bufio.write(b'\x01')
1547
1548 b = b"\x80\x81\x82\x83\x84"
1549 for i in range(0, len(b)):
1550 for j in range(i, len(b)):
1551 raw = self.BytesIO(b)
1552 bufio = self.tp(raw, 100)
1553 mutate(bufio, i, j)
1554 bufio.flush()
1555 expected = bytearray(b)
1556 expected[j] = 2
1557 expected[i] = 1
1558 self.assertEqual(raw.getvalue(), expected,
1559 "failed result for i=%d, j=%d" % (i, j))
1560
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001561 def test_truncate_after_read_or_write(self):
1562 raw = self.BytesIO(b"A" * 10)
1563 bufio = self.tp(raw, 100)
1564 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1565 self.assertEqual(bufio.truncate(), 2)
1566 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1567 self.assertEqual(bufio.truncate(), 4)
1568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569 def test_misbehaved_io(self):
1570 BufferedReaderTest.test_misbehaved_io(self)
1571 BufferedWriterTest.test_misbehaved_io(self)
1572
Antoine Pitroue05565e2011-08-20 14:39:23 +02001573 def test_interleaved_read_write(self):
1574 # Test for issue #12213
1575 with self.BytesIO(b'abcdefgh') as raw:
1576 with self.tp(raw, 100) as f:
1577 f.write(b"1")
1578 self.assertEqual(f.read(1), b'b')
1579 f.write(b'2')
1580 self.assertEqual(f.read1(1), b'd')
1581 f.write(b'3')
1582 buf = bytearray(1)
1583 f.readinto(buf)
1584 self.assertEqual(buf, b'f')
1585 f.write(b'4')
1586 self.assertEqual(f.peek(1), b'h')
1587 f.flush()
1588 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1589
1590 with self.BytesIO(b'abc') as raw:
1591 with self.tp(raw, 100) as f:
1592 self.assertEqual(f.read(1), b'a')
1593 f.write(b"2")
1594 self.assertEqual(f.read(1), b'c')
1595 f.flush()
1596 self.assertEqual(raw.getvalue(), b'a2c')
1597
1598 def test_interleaved_readline_write(self):
1599 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1600 with self.tp(raw) as f:
1601 f.write(b'1')
1602 self.assertEqual(f.readline(), b'b\n')
1603 f.write(b'2')
1604 self.assertEqual(f.readline(), b'def\n')
1605 f.write(b'3')
1606 self.assertEqual(f.readline(), b'\n')
1607 f.flush()
1608 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1609
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001610 # You can't construct a BufferedRandom over a non-seekable stream.
1611 test_unseekable = None
1612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613class CBufferedRandomTest(BufferedRandomTest):
1614 tp = io.BufferedRandom
1615
1616 def test_constructor(self):
1617 BufferedRandomTest.test_constructor(self)
1618 # The allocation can succeed on 32-bit builds, e.g. with more
1619 # than 2GB RAM and a 64-bit kernel.
1620 if sys.maxsize > 0x7FFFFFFF:
1621 rawio = self.MockRawIO()
1622 bufio = self.tp(rawio)
1623 self.assertRaises((OverflowError, MemoryError, ValueError),
1624 bufio.__init__, rawio, sys.maxsize)
1625
1626 def test_garbage_collection(self):
1627 CBufferedReaderTest.test_garbage_collection(self)
1628 CBufferedWriterTest.test_garbage_collection(self)
1629
1630class PyBufferedRandomTest(BufferedRandomTest):
1631 tp = pyio.BufferedRandom
1632
1633
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001634# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1635# properties:
1636# - A single output character can correspond to many bytes of input.
1637# - The number of input bytes to complete the character can be
1638# undetermined until the last input byte is received.
1639# - The number of input bytes can vary depending on previous input.
1640# - A single input byte can correspond to many characters of output.
1641# - The number of output characters can be undetermined until the
1642# last input byte is received.
1643# - The number of output characters can vary depending on previous input.
1644
1645class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1646 """
1647 For testing seek/tell behavior with a stateful, buffering decoder.
1648
1649 Input is a sequence of words. Words may be fixed-length (length set
1650 by input) or variable-length (period-terminated). In variable-length
1651 mode, extra periods are ignored. Possible words are:
1652 - 'i' followed by a number sets the input length, I (maximum 99).
1653 When I is set to 0, words are space-terminated.
1654 - 'o' followed by a number sets the output length, O (maximum 99).
1655 - Any other word is converted into a word followed by a period on
1656 the output. The output word consists of the input word truncated
1657 or padded out with hyphens to make its length equal to O. If O
1658 is 0, the word is output verbatim without truncating or padding.
1659 I and O are initially set to 1. When I changes, any buffered input is
1660 re-scanned according to the new I. EOF also terminates the last word.
1661 """
1662
1663 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001664 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001665 self.reset()
1666
1667 def __repr__(self):
1668 return '<SID %x>' % id(self)
1669
1670 def reset(self):
1671 self.i = 1
1672 self.o = 1
1673 self.buffer = bytearray()
1674
1675 def getstate(self):
1676 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1677 return bytes(self.buffer), i*100 + o
1678
1679 def setstate(self, state):
1680 buffer, io = state
1681 self.buffer = bytearray(buffer)
1682 i, o = divmod(io, 100)
1683 self.i, self.o = i ^ 1, o ^ 1
1684
1685 def decode(self, input, final=False):
1686 output = ''
1687 for b in input:
1688 if self.i == 0: # variable-length, terminated with period
1689 if b == ord('.'):
1690 if self.buffer:
1691 output += self.process_word()
1692 else:
1693 self.buffer.append(b)
1694 else: # fixed-length, terminate after self.i bytes
1695 self.buffer.append(b)
1696 if len(self.buffer) == self.i:
1697 output += self.process_word()
1698 if final and self.buffer: # EOF terminates the last word
1699 output += self.process_word()
1700 return output
1701
1702 def process_word(self):
1703 output = ''
1704 if self.buffer[0] == ord('i'):
1705 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1706 elif self.buffer[0] == ord('o'):
1707 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1708 else:
1709 output = self.buffer.decode('ascii')
1710 if len(output) < self.o:
1711 output += '-'*self.o # pad out with hyphens
1712 if self.o:
1713 output = output[:self.o] # truncate to output length
1714 output += '.'
1715 self.buffer = bytearray()
1716 return output
1717
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001718 codecEnabled = False
1719
1720 @classmethod
1721 def lookupTestDecoder(cls, name):
1722 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001723 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001724 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001725 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001726 incrementalencoder=None,
1727 streamreader=None, streamwriter=None,
1728 incrementaldecoder=cls)
1729
1730# Register the previous decoder for testing.
1731# Disabled by default, tests will enable it.
1732codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1733
1734
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001735class StatefulIncrementalDecoderTest(unittest.TestCase):
1736 """
1737 Make sure the StatefulIncrementalDecoder actually works.
1738 """
1739
1740 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001741 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001742 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001743 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001744 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001745 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001746 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001747 # I=0, O=6 (variable-length input, fixed-length output)
1748 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1749 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001750 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001751 # I=6, O=3 (fixed-length input > fixed-length output)
1752 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1753 # I=0, then 3; O=29, then 15 (with longer output)
1754 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1755 'a----------------------------.' +
1756 'b----------------------------.' +
1757 'cde--------------------------.' +
1758 'abcdefghijabcde.' +
1759 'a.b------------.' +
1760 '.c.------------.' +
1761 'd.e------------.' +
1762 'k--------------.' +
1763 'l--------------.' +
1764 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001765 ]
1766
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001768 # Try a few one-shot test cases.
1769 for input, eof, output in self.test_cases:
1770 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001771 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001772
1773 # Also test an unfinished decode, followed by forcing EOF.
1774 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001775 self.assertEqual(d.decode(b'oiabcd'), '')
1776 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001777
1778class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001779
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001780 def setUp(self):
1781 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1782 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001783 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001784
Guido van Rossumd0712812007-04-11 16:32:43 +00001785 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001786 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 def test_constructor(self):
1789 r = self.BytesIO(b"\xc3\xa9\n\n")
1790 b = self.BufferedReader(r, 1000)
1791 t = self.TextIOWrapper(b)
1792 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001793 self.assertEqual(t.encoding, "latin1")
1794 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001796 self.assertEqual(t.encoding, "utf8")
1797 self.assertEqual(t.line_buffering, True)
1798 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 self.assertRaises(TypeError, t.__init__, b, newline=42)
1800 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1801
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001802 def test_detach(self):
1803 r = self.BytesIO()
1804 b = self.BufferedWriter(r)
1805 t = self.TextIOWrapper(b)
1806 self.assertIs(t.detach(), b)
1807
1808 t = self.TextIOWrapper(b, encoding="ascii")
1809 t.write("howdy")
1810 self.assertFalse(r.getvalue())
1811 t.detach()
1812 self.assertEqual(r.getvalue(), b"howdy")
1813 self.assertRaises(ValueError, t.detach)
1814
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001815 def test_repr(self):
1816 raw = self.BytesIO("hello".encode("utf-8"))
1817 b = self.BufferedReader(raw)
1818 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001819 modname = self.TextIOWrapper.__module__
1820 self.assertEqual(repr(t),
1821 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1822 raw.name = "dummy"
1823 self.assertEqual(repr(t),
1824 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001825 t.mode = "r"
1826 self.assertEqual(repr(t),
1827 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001828 raw.name = b"dummy"
1829 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001830 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 def test_line_buffering(self):
1833 r = self.BytesIO()
1834 b = self.BufferedWriter(r, 1000)
1835 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001836 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001837 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001838 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001839 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001840 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001841 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001842
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843 def test_encoding(self):
1844 # Check the encoding attribute is always set, and valid
1845 b = self.BytesIO()
1846 t = self.TextIOWrapper(b, encoding="utf8")
1847 self.assertEqual(t.encoding, "utf8")
1848 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001849 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850 codecs.lookup(t.encoding)
1851
1852 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001853 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854 b = self.BytesIO(b"abc\n\xff\n")
1855 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001856 self.assertRaises(UnicodeError, t.read)
1857 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 b = self.BytesIO(b"abc\n\xff\n")
1859 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001860 self.assertRaises(UnicodeError, t.read)
1861 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001862 b = self.BytesIO(b"abc\n\xff\n")
1863 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001864 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001865 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 b = self.BytesIO(b"abc\n\xff\n")
1867 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001868 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001871 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 b = self.BytesIO()
1873 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001874 self.assertRaises(UnicodeError, t.write, "\xff")
1875 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 b = self.BytesIO()
1877 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001878 self.assertRaises(UnicodeError, t.write, "\xff")
1879 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 b = self.BytesIO()
1881 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001882 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001883 t.write("abc\xffdef\n")
1884 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001885 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001886 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001887 b = self.BytesIO()
1888 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001889 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001890 t.write("abc\xffdef\n")
1891 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001892 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001895 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1896
1897 tests = [
1898 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001899 [ '', input_lines ],
1900 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1901 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1902 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001903 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001904 encodings = (
1905 'utf-8', 'latin-1',
1906 'utf-16', 'utf-16-le', 'utf-16-be',
1907 'utf-32', 'utf-32-le', 'utf-32-be',
1908 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001909
Guido van Rossum8358db22007-08-18 21:39:55 +00001910 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001911 # character in TextIOWrapper._pending_line.
1912 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001913 # XXX: str.encode() should return bytes
1914 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001915 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001916 for bufsize in range(1, 10):
1917 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1919 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001920 encoding=encoding)
1921 if do_reads:
1922 got_lines = []
1923 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001924 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001925 if c2 == '':
1926 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001927 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001928 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001929 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001930 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001931
1932 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001933 self.assertEqual(got_line, exp_line)
1934 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936 def test_newlines_input(self):
1937 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001938 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1939 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001940 (None, normalized.decode("ascii").splitlines(True)),
1941 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001942 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1943 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1944 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001945 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 buf = self.BytesIO(testdata)
1947 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001948 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001949 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 def test_newlines_output(self):
1953 testdict = {
1954 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1955 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1956 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1957 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1958 }
1959 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1960 for newline, expected in tests:
1961 buf = self.BytesIO()
1962 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1963 txt.write("AAA\nB")
1964 txt.write("BB\nCCC\n")
1965 txt.write("X\rY\r\nZ")
1966 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001967 self.assertEqual(buf.closed, False)
1968 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001969
1970 def test_destructor(self):
1971 l = []
1972 base = self.BytesIO
1973 class MyBytesIO(base):
1974 def close(self):
1975 l.append(self.getvalue())
1976 base.close(self)
1977 b = MyBytesIO()
1978 t = self.TextIOWrapper(b, encoding="ascii")
1979 t.write("abc")
1980 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001981 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001982 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001983
1984 def test_override_destructor(self):
1985 record = []
1986 class MyTextIO(self.TextIOWrapper):
1987 def __del__(self):
1988 record.append(1)
1989 try:
1990 f = super().__del__
1991 except AttributeError:
1992 pass
1993 else:
1994 f()
1995 def close(self):
1996 record.append(2)
1997 super().close()
1998 def flush(self):
1999 record.append(3)
2000 super().flush()
2001 b = self.BytesIO()
2002 t = MyTextIO(b, encoding="ascii")
2003 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002004 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 self.assertEqual(record, [1, 2, 3])
2006
2007 def test_error_through_destructor(self):
2008 # Test that the exception state is not modified by a destructor,
2009 # even if close() fails.
2010 rawio = self.CloseFailureIO()
2011 def f():
2012 self.TextIOWrapper(rawio).xyzzy
2013 with support.captured_output("stderr") as s:
2014 self.assertRaises(AttributeError, f)
2015 s = s.getvalue().strip()
2016 if s:
2017 # The destructor *may* have printed an unraisable error, check it
2018 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002019 self.assertTrue(s.startswith("Exception IOError: "), s)
2020 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002021
Guido van Rossum9b76da62007-04-11 01:09:03 +00002022 # Systematic tests of the text I/O API
2023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002025 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2026 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002027 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002028 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002029 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002030 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002032 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002033 self.assertEqual(f.tell(), 0)
2034 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002035 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002036 self.assertEqual(f.seek(0), 0)
2037 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002038 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002039 self.assertEqual(f.read(2), "ab")
2040 self.assertEqual(f.read(1), "c")
2041 self.assertEqual(f.read(1), "")
2042 self.assertEqual(f.read(), "")
2043 self.assertEqual(f.tell(), cookie)
2044 self.assertEqual(f.seek(0), 0)
2045 self.assertEqual(f.seek(0, 2), cookie)
2046 self.assertEqual(f.write("def"), 3)
2047 self.assertEqual(f.seek(cookie), cookie)
2048 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002049 if enc.startswith("utf"):
2050 self.multi_line_test(f, enc)
2051 f.close()
2052
2053 def multi_line_test(self, f, enc):
2054 f.seek(0)
2055 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002056 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002057 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002058 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002059 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002060 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002061 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002062 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002063 wlines.append((f.tell(), line))
2064 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002065 f.seek(0)
2066 rlines = []
2067 while True:
2068 pos = f.tell()
2069 line = f.readline()
2070 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002071 break
2072 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002073 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002075 def test_telling(self):
2076 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002077 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002078 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002079 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002080 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002081 p2 = f.tell()
2082 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002083 self.assertEqual(f.tell(), p0)
2084 self.assertEqual(f.readline(), "\xff\n")
2085 self.assertEqual(f.tell(), p1)
2086 self.assertEqual(f.readline(), "\xff\n")
2087 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002088 f.seek(0)
2089 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002090 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002091 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002092 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002093 f.close()
2094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002095 def test_seeking(self):
2096 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002097 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002098 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002099 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002100 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002101 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002102 suffix = bytes(u_suffix.encode("utf-8"))
2103 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002104 with self.open(support.TESTFN, "wb") as f:
2105 f.write(line*2)
2106 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2107 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002108 self.assertEqual(s, str(prefix, "ascii"))
2109 self.assertEqual(f.tell(), prefix_size)
2110 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002113 # Regression test for a specific bug
2114 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002115 with self.open(support.TESTFN, "wb") as f:
2116 f.write(data)
2117 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2118 f._CHUNK_SIZE # Just test that it exists
2119 f._CHUNK_SIZE = 2
2120 f.readline()
2121 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002122
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123 def test_seek_and_tell(self):
2124 #Test seek/tell using the StatefulIncrementalDecoder.
2125 # Make test faster by doing smaller seeks
2126 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002127
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002128 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002129 """Tell/seek to various points within a data stream and ensure
2130 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002132 f.write(data)
2133 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002134 f = self.open(support.TESTFN, encoding='test_decoder')
2135 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002136 decoded = f.read()
2137 f.close()
2138
Neal Norwitze2b07052008-03-18 19:52:05 +00002139 for i in range(min_pos, len(decoded) + 1): # seek positions
2140 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002141 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002142 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002143 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002144 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002145 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002146 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002147 f.close()
2148
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002149 # Enable the test decoder.
2150 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002151
2152 # Run the tests.
2153 try:
2154 # Try each test case.
2155 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002156 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002157
2158 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002159 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2160 offset = CHUNK_SIZE - len(input)//2
2161 prefix = b'.'*offset
2162 # Don't bother seeking into the prefix (takes too long).
2163 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002164 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002165
2166 # Ensure our test decoder won't interfere with subsequent tests.
2167 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002168 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002171 data = "1234567890"
2172 tests = ("utf-16",
2173 "utf-16-le",
2174 "utf-16-be",
2175 "utf-32",
2176 "utf-32-le",
2177 "utf-32-be")
2178 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002179 buf = self.BytesIO()
2180 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002181 # Check if the BOM is written only once (see issue1753).
2182 f.write(data)
2183 f.write(data)
2184 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002186 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002187 self.assertEqual(f.read(), data * 2)
2188 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002189
Benjamin Petersona1b49012009-03-31 23:11:32 +00002190 def test_unreadable(self):
2191 class UnReadable(self.BytesIO):
2192 def readable(self):
2193 return False
2194 txt = self.TextIOWrapper(UnReadable())
2195 self.assertRaises(IOError, txt.read)
2196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_read_one_by_one(self):
2198 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002199 reads = ""
2200 while True:
2201 c = txt.read(1)
2202 if not c:
2203 break
2204 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002206
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002207 def test_readlines(self):
2208 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2209 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2210 txt.seek(0)
2211 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2212 txt.seek(0)
2213 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2214
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002215 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002216 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002217 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002218 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002219 reads = ""
2220 while True:
2221 c = txt.read(128)
2222 if not c:
2223 break
2224 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002225 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002226
2227 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002229
2230 # read one char at a time
2231 reads = ""
2232 while True:
2233 c = txt.read(1)
2234 if not c:
2235 break
2236 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002238
2239 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002241 txt._CHUNK_SIZE = 4
2242
2243 reads = ""
2244 while True:
2245 c = txt.read(4)
2246 if not c:
2247 break
2248 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002250
2251 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002252 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002253 txt._CHUNK_SIZE = 4
2254
2255 reads = txt.read(4)
2256 reads += txt.read(4)
2257 reads += txt.readline()
2258 reads += txt.readline()
2259 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002260 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002261
2262 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002264 txt._CHUNK_SIZE = 4
2265
2266 reads = txt.read(4)
2267 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002269
2270 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002272 txt._CHUNK_SIZE = 4
2273
2274 reads = txt.read(4)
2275 pos = txt.tell()
2276 txt.seek(0)
2277 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002278 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002279
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002280 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 buffer = self.BytesIO(self.testdata)
2282 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002283
2284 self.assertEqual(buffer.seekable(), txt.seekable())
2285
Antoine Pitroue4501852009-05-14 18:55:55 +00002286 def test_append_bom(self):
2287 # The BOM is not written again when appending to a non-empty file
2288 filename = support.TESTFN
2289 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2290 with self.open(filename, 'w', encoding=charset) as f:
2291 f.write('aaa')
2292 pos = f.tell()
2293 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002294 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002295
2296 with self.open(filename, 'a', encoding=charset) as f:
2297 f.write('xxx')
2298 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002299 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002300
2301 def test_seek_bom(self):
2302 # Same test, but when seeking manually
2303 filename = support.TESTFN
2304 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2305 with self.open(filename, 'w', encoding=charset) as f:
2306 f.write('aaa')
2307 pos = f.tell()
2308 with self.open(filename, 'r+', encoding=charset) as f:
2309 f.seek(pos)
2310 f.write('zzz')
2311 f.seek(0)
2312 f.write('bbb')
2313 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002314 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002315
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002316 def test_errors_property(self):
2317 with self.open(support.TESTFN, "w") as f:
2318 self.assertEqual(f.errors, "strict")
2319 with self.open(support.TESTFN, "w", errors="replace") as f:
2320 self.assertEqual(f.errors, "replace")
2321
Victor Stinner45df8202010-04-28 22:31:17 +00002322 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002323 def test_threads_write(self):
2324 # Issue6750: concurrent writes could duplicate data
2325 event = threading.Event()
2326 with self.open(support.TESTFN, "w", buffering=1) as f:
2327 def run(n):
2328 text = "Thread%03d\n" % n
2329 event.wait()
2330 f.write(text)
2331 threads = [threading.Thread(target=lambda n=x: run(n))
2332 for x in range(20)]
2333 for t in threads:
2334 t.start()
2335 time.sleep(0.02)
2336 event.set()
2337 for t in threads:
2338 t.join()
2339 with self.open(support.TESTFN) as f:
2340 content = f.read()
2341 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002342 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002343
Antoine Pitrou6be88762010-05-03 16:48:20 +00002344 def test_flush_error_on_close(self):
2345 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2346 def bad_flush():
2347 raise IOError()
2348 txt.flush = bad_flush
2349 self.assertRaises(IOError, txt.close) # exception not swallowed
2350
2351 def test_multi_close(self):
2352 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2353 txt.close()
2354 txt.close()
2355 txt.close()
2356 self.assertRaises(ValueError, txt.flush)
2357
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002358 def test_unseekable(self):
2359 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2360 self.assertRaises(self.UnsupportedOperation, txt.tell)
2361 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2362
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002363 def test_readonly_attributes(self):
2364 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2365 buf = self.BytesIO(self.testdata)
2366 with self.assertRaises(AttributeError):
2367 txt.buffer = buf
2368
Antoine Pitroue96ec682011-07-23 21:46:35 +02002369 def test_rawio(self):
2370 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2371 # that subprocess.Popen() can have the required unbuffered
2372 # semantics with universal_newlines=True.
2373 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2374 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2375 # Reads
2376 self.assertEqual(txt.read(4), 'abcd')
2377 self.assertEqual(txt.readline(), 'efghi\n')
2378 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2379
2380 def test_rawio_write_through(self):
2381 # Issue #12591: with write_through=True, writes don't need a flush
2382 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2383 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2384 write_through=True)
2385 txt.write('1')
2386 txt.write('23\n4')
2387 txt.write('5')
2388 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2389
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002390class CTextIOWrapperTest(TextIOWrapperTest):
2391
2392 def test_initialization(self):
2393 r = self.BytesIO(b"\xc3\xa9\n\n")
2394 b = self.BufferedReader(r, 1000)
2395 t = self.TextIOWrapper(b)
2396 self.assertRaises(TypeError, t.__init__, b, newline=42)
2397 self.assertRaises(ValueError, t.read)
2398 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2399 self.assertRaises(ValueError, t.read)
2400
2401 def test_garbage_collection(self):
2402 # C TextIOWrapper objects are collected, and collecting them flushes
2403 # all data to disk.
2404 # The Python version has __del__, so it ends in gc.garbage instead.
2405 rawio = io.FileIO(support.TESTFN, "wb")
2406 b = self.BufferedWriter(rawio)
2407 t = self.TextIOWrapper(b, encoding="ascii")
2408 t.write("456def")
2409 t.x = t
2410 wr = weakref.ref(t)
2411 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002412 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002413 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002414 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 self.assertEqual(f.read(), b"456def")
2416
Charles-François Natali42c28cd2011-10-05 19:53:43 +02002417 def test_rwpair_cleared_before_textio(self):
2418 # Issue 13070: TextIOWrapper's finalization would crash when called
2419 # after the reference to the underlying BufferedRWPair's writer got
2420 # cleared by the GC.
2421 for i in range(1000):
2422 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2423 t1 = self.TextIOWrapper(b1, encoding="ascii")
2424 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2425 t2 = self.TextIOWrapper(b2, encoding="ascii")
2426 # circular references
2427 t1.buddy = t2
2428 t2.buddy = t1
2429 support.gc_collect()
2430
2431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432class PyTextIOWrapperTest(TextIOWrapperTest):
2433 pass
2434
2435
2436class IncrementalNewlineDecoderTest(unittest.TestCase):
2437
2438 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002439 # UTF-8 specific tests for a newline decoder
2440 def _check_decode(b, s, **kwargs):
2441 # We exercise getstate() / setstate() as well as decode()
2442 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002443 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002444 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002445 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002446
Antoine Pitrou180a3362008-12-14 16:36:46 +00002447 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002448
Antoine Pitrou180a3362008-12-14 16:36:46 +00002449 _check_decode(b'\xe8', "")
2450 _check_decode(b'\xa2', "")
2451 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002452
Antoine Pitrou180a3362008-12-14 16:36:46 +00002453 _check_decode(b'\xe8', "")
2454 _check_decode(b'\xa2', "")
2455 _check_decode(b'\x88', "\u8888")
2456
2457 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002458 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2459
Antoine Pitrou180a3362008-12-14 16:36:46 +00002460 decoder.reset()
2461 _check_decode(b'\n', "\n")
2462 _check_decode(b'\r', "")
2463 _check_decode(b'', "\n", final=True)
2464 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002465
Antoine Pitrou180a3362008-12-14 16:36:46 +00002466 _check_decode(b'\r', "")
2467 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002468
Antoine Pitrou180a3362008-12-14 16:36:46 +00002469 _check_decode(b'\r\r\n', "\n\n")
2470 _check_decode(b'\r', "")
2471 _check_decode(b'\r', "\n")
2472 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002473
Antoine Pitrou180a3362008-12-14 16:36:46 +00002474 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2475 _check_decode(b'\xe8\xa2\x88', "\u8888")
2476 _check_decode(b'\n', "\n")
2477 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2478 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002479
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002480 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002481 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002482 if encoding is not None:
2483 encoder = codecs.getincrementalencoder(encoding)()
2484 def _decode_bytewise(s):
2485 # Decode one byte at a time
2486 for b in encoder.encode(s):
2487 result.append(decoder.decode(bytes([b])))
2488 else:
2489 encoder = None
2490 def _decode_bytewise(s):
2491 # Decode one char at a time
2492 for c in s:
2493 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002494 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002495 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002496 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002497 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002498 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002499 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002500 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002501 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002502 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002503 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002504 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002505 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002506 input = "abc"
2507 if encoder is not None:
2508 encoder.reset()
2509 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002510 self.assertEqual(decoder.decode(input), "abc")
2511 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002512
2513 def test_newline_decoder(self):
2514 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 # None meaning the IncrementalNewlineDecoder takes unicode input
2516 # rather than bytes input
2517 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002518 'utf-16', 'utf-16-le', 'utf-16-be',
2519 'utf-32', 'utf-32-le', 'utf-32-be',
2520 )
2521 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 decoder = enc and codecs.getincrementaldecoder(enc)()
2523 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2524 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002525 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002526 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2527 self.check_newline_decoding_utf8(decoder)
2528
Antoine Pitrou66913e22009-03-06 23:40:56 +00002529 def test_newline_bytes(self):
2530 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2531 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002532 self.assertEqual(dec.newlines, None)
2533 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2534 self.assertEqual(dec.newlines, None)
2535 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2536 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002537 dec = self.IncrementalNewlineDecoder(None, translate=False)
2538 _check(dec)
2539 dec = self.IncrementalNewlineDecoder(None, translate=True)
2540 _check(dec)
2541
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002542class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2543 pass
2544
2545class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2546 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002547
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002548
Guido van Rossum01a27522007-03-07 01:00:12 +00002549# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002550
Guido van Rossum5abbf752007-08-27 17:39:33 +00002551class MiscIOTest(unittest.TestCase):
2552
Barry Warsaw40e82462008-11-20 20:14:50 +00002553 def tearDown(self):
2554 support.unlink(support.TESTFN)
2555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002556 def test___all__(self):
2557 for name in self.io.__all__:
2558 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002559 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002560 if name == "open":
2561 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002562 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002563 self.assertTrue(issubclass(obj, Exception), name)
2564 elif not name.startswith("SEEK_"):
2565 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002566
Barry Warsaw40e82462008-11-20 20:14:50 +00002567 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002570 f.close()
2571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002572 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002573 self.assertEqual(f.name, support.TESTFN)
2574 self.assertEqual(f.buffer.name, support.TESTFN)
2575 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2576 self.assertEqual(f.mode, "U")
2577 self.assertEqual(f.buffer.mode, "rb")
2578 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002579 f.close()
2580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002581 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(f.mode, "w+")
2583 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2584 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002585
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002586 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002587 self.assertEqual(g.mode, "wb")
2588 self.assertEqual(g.raw.mode, "wb")
2589 self.assertEqual(g.name, f.fileno())
2590 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002591 f.close()
2592 g.close()
2593
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002594 def test_io_after_close(self):
2595 for kwargs in [
2596 {"mode": "w"},
2597 {"mode": "wb"},
2598 {"mode": "w", "buffering": 1},
2599 {"mode": "w", "buffering": 2},
2600 {"mode": "wb", "buffering": 0},
2601 {"mode": "r"},
2602 {"mode": "rb"},
2603 {"mode": "r", "buffering": 1},
2604 {"mode": "r", "buffering": 2},
2605 {"mode": "rb", "buffering": 0},
2606 {"mode": "w+"},
2607 {"mode": "w+b"},
2608 {"mode": "w+", "buffering": 1},
2609 {"mode": "w+", "buffering": 2},
2610 {"mode": "w+b", "buffering": 0},
2611 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002613 f.close()
2614 self.assertRaises(ValueError, f.flush)
2615 self.assertRaises(ValueError, f.fileno)
2616 self.assertRaises(ValueError, f.isatty)
2617 self.assertRaises(ValueError, f.__iter__)
2618 if hasattr(f, "peek"):
2619 self.assertRaises(ValueError, f.peek, 1)
2620 self.assertRaises(ValueError, f.read)
2621 if hasattr(f, "read1"):
2622 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002623 if hasattr(f, "readall"):
2624 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002625 if hasattr(f, "readinto"):
2626 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2627 self.assertRaises(ValueError, f.readline)
2628 self.assertRaises(ValueError, f.readlines)
2629 self.assertRaises(ValueError, f.seek, 0)
2630 self.assertRaises(ValueError, f.tell)
2631 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002632 self.assertRaises(ValueError, f.write,
2633 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002634 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002635 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002637 def test_blockingioerror(self):
2638 # Various BlockingIOError issues
2639 self.assertRaises(TypeError, self.BlockingIOError)
2640 self.assertRaises(TypeError, self.BlockingIOError, 1)
2641 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2642 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2643 b = self.BlockingIOError(1, "")
2644 self.assertEqual(b.characters_written, 0)
2645 class C(str):
2646 pass
2647 c = C("")
2648 b = self.BlockingIOError(1, c)
2649 c.b = b
2650 b.c = c
2651 wr = weakref.ref(c)
2652 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002653 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002654 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002655
2656 def test_abcs(self):
2657 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002658 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2659 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2660 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2661 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002662
2663 def _check_abc_inheritance(self, abcmodule):
2664 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002665 self.assertIsInstance(f, abcmodule.IOBase)
2666 self.assertIsInstance(f, abcmodule.RawIOBase)
2667 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2668 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002669 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002670 self.assertIsInstance(f, abcmodule.IOBase)
2671 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2672 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2673 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002675 self.assertIsInstance(f, abcmodule.IOBase)
2676 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2677 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2678 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002679
2680 def test_abc_inheritance(self):
2681 # Test implementations inherit from their respective ABCs
2682 self._check_abc_inheritance(self)
2683
2684 def test_abc_inheritance_official(self):
2685 # Test implementations inherit from the official ABCs of the
2686 # baseline "io" module.
2687 self._check_abc_inheritance(io)
2688
Antoine Pitroue033e062010-10-29 10:38:18 +00002689 def _check_warn_on_dealloc(self, *args, **kwargs):
2690 f = open(*args, **kwargs)
2691 r = repr(f)
2692 with self.assertWarns(ResourceWarning) as cm:
2693 f = None
2694 support.gc_collect()
2695 self.assertIn(r, str(cm.warning.args[0]))
2696
2697 def test_warn_on_dealloc(self):
2698 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2699 self._check_warn_on_dealloc(support.TESTFN, "wb")
2700 self._check_warn_on_dealloc(support.TESTFN, "w")
2701
2702 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2703 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002704 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002705 for fd in fds:
2706 try:
2707 os.close(fd)
2708 except EnvironmentError as e:
2709 if e.errno != errno.EBADF:
2710 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002711 self.addCleanup(cleanup_fds)
2712 r, w = os.pipe()
2713 fds += r, w
2714 self._check_warn_on_dealloc(r, *args, **kwargs)
2715 # When using closefd=False, there's no warning
2716 r, w = os.pipe()
2717 fds += r, w
2718 with warnings.catch_warnings(record=True) as recorded:
2719 open(r, *args, closefd=False, **kwargs)
2720 support.gc_collect()
2721 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002722
2723 def test_warn_on_dealloc_fd(self):
2724 self._check_warn_on_dealloc_fd("rb", buffering=0)
2725 self._check_warn_on_dealloc_fd("rb")
2726 self._check_warn_on_dealloc_fd("r")
2727
2728
Antoine Pitrou243757e2010-11-05 21:15:39 +00002729 def test_pickling(self):
2730 # Pickling file objects is forbidden
2731 for kwargs in [
2732 {"mode": "w"},
2733 {"mode": "wb"},
2734 {"mode": "wb", "buffering": 0},
2735 {"mode": "r"},
2736 {"mode": "rb"},
2737 {"mode": "rb", "buffering": 0},
2738 {"mode": "w+"},
2739 {"mode": "w+b"},
2740 {"mode": "w+b", "buffering": 0},
2741 ]:
2742 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2743 with self.open(support.TESTFN, **kwargs) as f:
2744 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746class CMiscIOTest(MiscIOTest):
2747 io = io
2748
2749class PyMiscIOTest(MiscIOTest):
2750 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002751
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002752
2753@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2754class SignalsTest(unittest.TestCase):
2755
2756 def setUp(self):
2757 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2758
2759 def tearDown(self):
2760 signal.signal(signal.SIGALRM, self.oldalrm)
2761
2762 def alarm_interrupt(self, sig, frame):
2763 1/0
2764
2765 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002766 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2767 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002768 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2769 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002770 invokes the signal handler, and bubbles up the exception raised
2771 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002772 read_results = []
2773 def _read():
2774 s = os.read(r, 1)
2775 read_results.append(s)
2776 t = threading.Thread(target=_read)
2777 t.daemon = True
2778 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002779 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002780 try:
2781 wio = self.io.open(w, **fdopen_kwargs)
2782 t.start()
2783 signal.alarm(1)
2784 # Fill the pipe enough that the write will be blocking.
2785 # It will be interrupted by the timer armed above. Since the
2786 # other thread has read one byte, the low-level write will
2787 # return with a successful (partial) result rather than an EINTR.
2788 # The buffered IO layer must check for pending signal
2789 # handlers, which in this case will invoke alarm_interrupt().
2790 self.assertRaises(ZeroDivisionError,
2791 wio.write, item * (1024 * 1024))
2792 t.join()
2793 # We got one byte, get another one and check that it isn't a
2794 # repeat of the first one.
2795 read_results.append(os.read(r, 1))
2796 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2797 finally:
2798 os.close(w)
2799 os.close(r)
2800 # This is deliberate. If we didn't close the file descriptor
2801 # before closing wio, wio would try to flush its internal
2802 # buffer, and block again.
2803 try:
2804 wio.close()
2805 except IOError as e:
2806 if e.errno != errno.EBADF:
2807 raise
2808
2809 def test_interrupted_write_unbuffered(self):
2810 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2811
2812 def test_interrupted_write_buffered(self):
2813 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2814
2815 def test_interrupted_write_text(self):
2816 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2817
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002818 def check_reentrant_write(self, data, **fdopen_kwargs):
2819 def on_alarm(*args):
2820 # Will be called reentrantly from the same thread
2821 wio.write(data)
2822 1/0
2823 signal.signal(signal.SIGALRM, on_alarm)
2824 r, w = os.pipe()
2825 wio = self.io.open(w, **fdopen_kwargs)
2826 try:
2827 signal.alarm(1)
2828 # Either the reentrant call to wio.write() fails with RuntimeError,
2829 # or the signal handler raises ZeroDivisionError.
2830 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2831 while 1:
2832 for i in range(100):
2833 wio.write(data)
2834 wio.flush()
2835 # Make sure the buffer doesn't fill up and block further writes
2836 os.read(r, len(data) * 100)
2837 exc = cm.exception
2838 if isinstance(exc, RuntimeError):
2839 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2840 finally:
2841 wio.close()
2842 os.close(r)
2843
2844 def test_reentrant_write_buffered(self):
2845 self.check_reentrant_write(b"xy", mode="wb")
2846
2847 def test_reentrant_write_text(self):
2848 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2849
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002850 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2851 """Check that a buffered read, when it gets interrupted (either
2852 returning a partial result or EINTR), properly invokes the signal
2853 handler and retries if the latter returned successfully."""
2854 r, w = os.pipe()
2855 fdopen_kwargs["closefd"] = False
2856 def alarm_handler(sig, frame):
2857 os.write(w, b"bar")
2858 signal.signal(signal.SIGALRM, alarm_handler)
2859 try:
2860 rio = self.io.open(r, **fdopen_kwargs)
2861 os.write(w, b"foo")
2862 signal.alarm(1)
2863 # Expected behaviour:
2864 # - first raw read() returns partial b"foo"
2865 # - second raw read() returns EINTR
2866 # - third raw read() returns b"bar"
2867 self.assertEqual(decode(rio.read(6)), "foobar")
2868 finally:
2869 rio.close()
2870 os.close(w)
2871 os.close(r)
2872
Antoine Pitrou20db5112011-08-19 20:32:34 +02002873 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002874 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2875 mode="rb")
2876
Antoine Pitrou20db5112011-08-19 20:32:34 +02002877 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002878 self.check_interrupted_read_retry(lambda x: x,
2879 mode="r")
2880
2881 @unittest.skipUnless(threading, 'Threading required for this test.')
2882 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2883 """Check that a buffered write, when it gets interrupted (either
2884 returning a partial result or EINTR), properly invokes the signal
2885 handler and retries if the latter returned successfully."""
2886 select = support.import_module("select")
2887 # A quantity that exceeds the buffer size of an anonymous pipe's
2888 # write end.
2889 N = 1024 * 1024
2890 r, w = os.pipe()
2891 fdopen_kwargs["closefd"] = False
2892 # We need a separate thread to read from the pipe and allow the
2893 # write() to finish. This thread is started after the SIGALRM is
2894 # received (forcing a first EINTR in write()).
2895 read_results = []
2896 write_finished = False
2897 def _read():
2898 while not write_finished:
2899 while r in select.select([r], [], [], 1.0)[0]:
2900 s = os.read(r, 1024)
2901 read_results.append(s)
2902 t = threading.Thread(target=_read)
2903 t.daemon = True
2904 def alarm1(sig, frame):
2905 signal.signal(signal.SIGALRM, alarm2)
2906 signal.alarm(1)
2907 def alarm2(sig, frame):
2908 t.start()
2909 signal.signal(signal.SIGALRM, alarm1)
2910 try:
2911 wio = self.io.open(w, **fdopen_kwargs)
2912 signal.alarm(1)
2913 # Expected behaviour:
2914 # - first raw write() is partial (because of the limited pipe buffer
2915 # and the first alarm)
2916 # - second raw write() returns EINTR (because of the second alarm)
2917 # - subsequent write()s are successful (either partial or complete)
2918 self.assertEqual(N, wio.write(item * N))
2919 wio.flush()
2920 write_finished = True
2921 t.join()
2922 self.assertEqual(N, sum(len(x) for x in read_results))
2923 finally:
2924 write_finished = True
2925 os.close(w)
2926 os.close(r)
2927 # This is deliberate. If we didn't close the file descriptor
2928 # before closing wio, wio would try to flush its internal
2929 # buffer, and could block (in case of failure).
2930 try:
2931 wio.close()
2932 except IOError as e:
2933 if e.errno != errno.EBADF:
2934 raise
2935
Antoine Pitrou20db5112011-08-19 20:32:34 +02002936 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002937 self.check_interrupted_write_retry(b"x", mode="wb")
2938
Antoine Pitrou20db5112011-08-19 20:32:34 +02002939 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002940 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2941
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002942
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002943class CSignalsTest(SignalsTest):
2944 io = io
2945
2946class PySignalsTest(SignalsTest):
2947 io = pyio
2948
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002949 # Handling reentrancy issues would slow down _pyio even more, so the
2950 # tests are disabled.
2951 test_reentrant_write_buffered = None
2952 test_reentrant_write_text = None
2953
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002954
Guido van Rossum28524c72007-02-27 05:47:44 +00002955def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002956 tests = (CIOTest, PyIOTest,
2957 CBufferedReaderTest, PyBufferedReaderTest,
2958 CBufferedWriterTest, PyBufferedWriterTest,
2959 CBufferedRWPairTest, PyBufferedRWPairTest,
2960 CBufferedRandomTest, PyBufferedRandomTest,
2961 StatefulIncrementalDecoderTest,
2962 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2963 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002964 CMiscIOTest, PyMiscIOTest,
2965 CSignalsTest, PySignalsTest,
2966 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002967
2968 # Put the namespaces of the IO module we are testing and some useful mock
2969 # classes in the __dict__ of each test.
2970 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002971 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2973 c_io_ns = {name : getattr(io, name) for name in all_members}
2974 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2975 globs = globals()
2976 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2977 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2978 # Avoid turning open into a bound method.
2979 py_io_ns["open"] = pyio.OpenWrapper
2980 for test in tests:
2981 if test.__name__.startswith("C"):
2982 for name, obj in c_io_ns.items():
2983 setattr(test, name, obj)
2984 elif test.__name__.startswith("Py"):
2985 for name, obj in py_io_ns.items():
2986 setattr(test, name, obj)
2987
2988 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002989
2990if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 test_main()