blob: 0dc9d6dff42e404ee3db8f830b2a7d5098bdfcff [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400613 def test_types_have_dict(self):
614 test = (
615 self.IOBase(),
616 self.RawIOBase(),
617 self.TextIOBase(),
618 self.StringIO(),
619 self.BytesIO()
620 )
621 for obj in test:
622 self.assertTrue(hasattr(obj, "__dict__"))
623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200625
626 def test_IOBase_finalize(self):
627 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
628 # class which inherits IOBase and an object of this class are caught
629 # in a reference cycle and close() is already in the method cache.
630 class MyIO(self.IOBase):
631 def close(self):
632 pass
633
634 # create an instance to populate the method cache
635 MyIO()
636 obj = MyIO()
637 obj.obj = obj
638 wr = weakref.ref(obj)
639 del MyIO
640 del obj
641 support.gc_collect()
642 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000644class PyIOTest(IOTest):
645 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000646
Guido van Rossuma9e20242007-03-08 00:43:48 +0000647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000648class CommonBufferedTests:
649 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
650
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000651 def test_detach(self):
652 raw = self.MockRawIO()
653 buf = self.tp(raw)
654 self.assertIs(buf.detach(), raw)
655 self.assertRaises(ValueError, buf.detach)
656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 def test_fileno(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662
663 def test_no_fileno(self):
664 # XXX will we always have fileno() function? If so, kill
665 # this test. Else, write it.
666 pass
667
668 def test_invalid_args(self):
669 rawio = self.MockRawIO()
670 bufio = self.tp(rawio)
671 # Invalid whence
672 self.assertRaises(ValueError, bufio.seek, 0, -1)
673 self.assertRaises(ValueError, bufio.seek, 0, 3)
674
675 def test_override_destructor(self):
676 tp = self.tp
677 record = []
678 class MyBufferedIO(tp):
679 def __del__(self):
680 record.append(1)
681 try:
682 f = super().__del__
683 except AttributeError:
684 pass
685 else:
686 f()
687 def close(self):
688 record.append(2)
689 super().close()
690 def flush(self):
691 record.append(3)
692 super().flush()
693 rawio = self.MockRawIO()
694 bufio = MyBufferedIO(rawio)
695 writable = bufio.writable()
696 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 if writable:
699 self.assertEqual(record, [1, 2, 3])
700 else:
701 self.assertEqual(record, [1, 2])
702
703 def test_context_manager(self):
704 # Test usability as a context manager
705 rawio = self.MockRawIO()
706 bufio = self.tp(rawio)
707 def _with():
708 with bufio:
709 pass
710 _with()
711 # bufio should now be closed, and using it a second time should raise
712 # a ValueError.
713 self.assertRaises(ValueError, _with)
714
715 def test_error_through_destructor(self):
716 # Test that the exception state is not modified by a destructor,
717 # even if close() fails.
718 rawio = self.CloseFailureIO()
719 def f():
720 self.tp(rawio).xyzzy
721 with support.captured_output("stderr") as s:
722 self.assertRaises(AttributeError, f)
723 s = s.getvalue().strip()
724 if s:
725 # The destructor *may* have printed an unraisable error, check it
726 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000727 self.assertTrue(s.startswith("Exception IOError: "), s)
728 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000729
Antoine Pitrou716c4442009-05-23 19:04:03 +0000730 def test_repr(self):
731 raw = self.MockRawIO()
732 b = self.tp(raw)
733 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
734 self.assertEqual(repr(b), "<%s>" % clsname)
735 raw.name = "dummy"
736 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
737 raw.name = b"dummy"
738 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
739
Antoine Pitrou6be88762010-05-03 16:48:20 +0000740 def test_flush_error_on_close(self):
741 raw = self.MockRawIO()
742 def bad_flush():
743 raise IOError()
744 raw.flush = bad_flush
745 b = self.tp(raw)
746 self.assertRaises(IOError, b.close) # exception not swallowed
747
748 def test_multi_close(self):
749 raw = self.MockRawIO()
750 b = self.tp(raw)
751 b.close()
752 b.close()
753 b.close()
754 self.assertRaises(ValueError, b.flush)
755
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000756 def test_unseekable(self):
757 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
758 self.assertRaises(self.UnsupportedOperation, bufio.tell)
759 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
760
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000761 def test_readonly_attributes(self):
762 raw = self.MockRawIO()
763 buf = self.tp(raw)
764 x = self.MockRawIO()
765 with self.assertRaises(AttributeError):
766 buf.raw = x
767
Guido van Rossum78892e42007-04-06 17:31:18 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
770 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000771
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000772 def test_constructor(self):
773 rawio = self.MockRawIO([b"abc"])
774 bufio = self.tp(rawio)
775 bufio.__init__(rawio)
776 bufio.__init__(rawio, buffer_size=1024)
777 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000778 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000779 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
780 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
781 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
782 rawio = self.MockRawIO([b"abc"])
783 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000784 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000787 for arg in (None, 7):
788 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
789 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000790 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000791 # Invalid args
792 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 def test_read1(self):
795 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
796 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000797 self.assertEqual(b"a", bufio.read(1))
798 self.assertEqual(b"b", bufio.read1(1))
799 self.assertEqual(rawio._reads, 1)
800 self.assertEqual(b"c", bufio.read1(100))
801 self.assertEqual(rawio._reads, 1)
802 self.assertEqual(b"d", bufio.read1(100))
803 self.assertEqual(rawio._reads, 2)
804 self.assertEqual(b"efg", bufio.read1(100))
805 self.assertEqual(rawio._reads, 3)
806 self.assertEqual(b"", bufio.read1(100))
807 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 # Invalid args
809 self.assertRaises(ValueError, bufio.read1, -1)
810
811 def test_readinto(self):
812 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
813 bufio = self.tp(rawio)
814 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000815 self.assertEqual(bufio.readinto(b), 2)
816 self.assertEqual(b, b"ab")
817 self.assertEqual(bufio.readinto(b), 2)
818 self.assertEqual(b, b"cd")
819 self.assertEqual(bufio.readinto(b), 2)
820 self.assertEqual(b, b"ef")
821 self.assertEqual(bufio.readinto(b), 1)
822 self.assertEqual(b, b"gf")
823 self.assertEqual(bufio.readinto(b), 0)
824 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000825
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000826 def test_readlines(self):
827 def bufio():
828 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
829 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
831 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
832 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000835 data = b"abcdefghi"
836 dlen = len(data)
837
838 tests = [
839 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
840 [ 100, [ 3, 3, 3], [ dlen ] ],
841 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
842 ]
843
844 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 rawio = self.MockFileIO(data)
846 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000847 pos = 0
848 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000849 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000850 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000852 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000854 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000855 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000856 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
857 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000858 self.assertEqual(b"abcd", bufio.read(6))
859 self.assertEqual(b"e", bufio.read(1))
860 self.assertEqual(b"fg", bufio.read())
861 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200862 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000863 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000864
Victor Stinnera80987f2011-05-25 22:47:16 +0200865 rawio = self.MockRawIO((b"a", None, None))
866 self.assertEqual(b"a", rawio.readall())
867 self.assertIsNone(rawio.readall())
868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def test_read_past_eof(self):
870 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
871 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000872
Ezio Melottib3aedd42010-11-20 19:04:17 +0000873 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000875 def test_read_all(self):
876 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
877 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000878
Ezio Melottib3aedd42010-11-20 19:04:17 +0000879 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000880
Victor Stinner45df8202010-04-28 22:31:17 +0000881 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000882 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000884 try:
885 # Write out many bytes with exactly the same number of 0's,
886 # 1's... 255's. This will help us check that concurrent reading
887 # doesn't duplicate or forget contents.
888 N = 1000
889 l = list(range(256)) * N
890 random.shuffle(l)
891 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000892 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000893 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000894 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000896 errors = []
897 results = []
898 def f():
899 try:
900 # Intra-buffer read then buffer-flushing read
901 for n in cycle([1, 19]):
902 s = bufio.read(n)
903 if not s:
904 break
905 # list.append() is atomic
906 results.append(s)
907 except Exception as e:
908 errors.append(e)
909 raise
910 threads = [threading.Thread(target=f) for x in range(20)]
911 for t in threads:
912 t.start()
913 time.sleep(0.02) # yield
914 for t in threads:
915 t.join()
916 self.assertFalse(errors,
917 "the following exceptions were caught: %r" % errors)
918 s = b''.join(results)
919 for i in range(256):
920 c = bytes(bytearray([i]))
921 self.assertEqual(s.count(c), N)
922 finally:
923 support.unlink(support.TESTFN)
924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 def test_misbehaved_io(self):
926 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
927 bufio = self.tp(rawio)
928 self.assertRaises(IOError, bufio.seek, 0)
929 self.assertRaises(IOError, bufio.tell)
930
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000931 def test_no_extraneous_read(self):
932 # Issue #9550; when the raw IO object has satisfied the read request,
933 # we should not issue any additional reads, otherwise it may block
934 # (e.g. socket).
935 bufsize = 16
936 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
937 rawio = self.MockRawIO([b"x" * n])
938 bufio = self.tp(rawio, bufsize)
939 self.assertEqual(bufio.read(n), b"x" * n)
940 # Simple case: one raw read is enough to satisfy the request.
941 self.assertEqual(rawio._extraneous_reads, 0,
942 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
943 # A more complex case where two raw reads are needed to satisfy
944 # the request.
945 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
946 bufio = self.tp(rawio, bufsize)
947 self.assertEqual(bufio.read(n), b"x" * n)
948 self.assertEqual(rawio._extraneous_reads, 0,
949 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
950
951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000952class CBufferedReaderTest(BufferedReaderTest):
953 tp = io.BufferedReader
954
955 def test_constructor(self):
956 BufferedReaderTest.test_constructor(self)
957 # The allocation can succeed on 32-bit builds, e.g. with more
958 # than 2GB RAM and a 64-bit kernel.
959 if sys.maxsize > 0x7FFFFFFF:
960 rawio = self.MockRawIO()
961 bufio = self.tp(rawio)
962 self.assertRaises((OverflowError, MemoryError, ValueError),
963 bufio.__init__, rawio, sys.maxsize)
964
965 def test_initialization(self):
966 rawio = self.MockRawIO([b"abc"])
967 bufio = self.tp(rawio)
968 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
969 self.assertRaises(ValueError, bufio.read)
970 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
971 self.assertRaises(ValueError, bufio.read)
972 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
973 self.assertRaises(ValueError, bufio.read)
974
975 def test_misbehaved_io_read(self):
976 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
977 bufio = self.tp(rawio)
978 # _pyio.BufferedReader seems to implement reading different, so that
979 # checking this is not so easy.
980 self.assertRaises(IOError, bufio.read, 10)
981
982 def test_garbage_collection(self):
983 # C BufferedReader objects are collected.
984 # The Python version has __del__, so it ends into gc.garbage instead
985 rawio = self.FileIO(support.TESTFN, "w+b")
986 f = self.tp(rawio)
987 f.f = f
988 wr = weakref.ref(f)
989 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000990 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000991 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992
993class PyBufferedReaderTest(BufferedReaderTest):
994 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000995
Guido van Rossuma9e20242007-03-08 00:43:48 +0000996
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000997class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
998 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001000 def test_constructor(self):
1001 rawio = self.MockRawIO()
1002 bufio = self.tp(rawio)
1003 bufio.__init__(rawio)
1004 bufio.__init__(rawio, buffer_size=1024)
1005 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001006 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 bufio.flush()
1008 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1009 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1010 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1011 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001012 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001014 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001016 def test_detach_flush(self):
1017 raw = self.MockRawIO()
1018 buf = self.tp(raw)
1019 buf.write(b"howdy!")
1020 self.assertFalse(raw._write_stack)
1021 buf.detach()
1022 self.assertEqual(raw._write_stack, [b"howdy!"])
1023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001025 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 writer = self.MockRawIO()
1027 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001028 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001029 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_write_overflow(self):
1032 writer = self.MockRawIO()
1033 bufio = self.tp(writer, 8)
1034 contents = b"abcdefghijklmnop"
1035 for n in range(0, len(contents), 3):
1036 bufio.write(contents[n:n+3])
1037 flushed = b"".join(writer._write_stack)
1038 # At least (total - 8) bytes were implicitly flushed, perhaps more
1039 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001040 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 def check_writes(self, intermediate_func):
1043 # Lots of writes, test the flushed output is as expected.
1044 contents = bytes(range(256)) * 1000
1045 n = 0
1046 writer = self.MockRawIO()
1047 bufio = self.tp(writer, 13)
1048 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1049 def gen_sizes():
1050 for size in count(1):
1051 for i in range(15):
1052 yield size
1053 sizes = gen_sizes()
1054 while n < len(contents):
1055 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 intermediate_func(bufio)
1058 n += size
1059 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001060 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062 def test_writes(self):
1063 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def test_writes_and_flushes(self):
1066 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001067
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 def test_writes_and_seeks(self):
1069 def _seekabs(bufio):
1070 pos = bufio.tell()
1071 bufio.seek(pos + 1, 0)
1072 bufio.seek(pos - 1, 0)
1073 bufio.seek(pos, 0)
1074 self.check_writes(_seekabs)
1075 def _seekrel(bufio):
1076 pos = bufio.seek(0, 1)
1077 bufio.seek(+1, 1)
1078 bufio.seek(-1, 1)
1079 bufio.seek(pos, 0)
1080 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_writes_and_truncates(self):
1083 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 def test_write_non_blocking(self):
1086 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001087 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001088
Ezio Melottib3aedd42010-11-20 19:04:17 +00001089 self.assertEqual(bufio.write(b"abcd"), 4)
1090 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 # 1 byte will be written, the rest will be buffered
1092 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1096 raw.block_on(b"0")
1097 try:
1098 bufio.write(b"opqrwxyz0123456789")
1099 except self.BlockingIOError as e:
1100 written = e.characters_written
1101 else:
1102 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001103 self.assertEqual(written, 16)
1104 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001105 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001106
Ezio Melottib3aedd42010-11-20 19:04:17 +00001107 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108 s = raw.pop_written()
1109 # Previously buffered bytes were flushed
1110 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 def test_write_and_rewind(self):
1113 raw = io.BytesIO()
1114 bufio = self.tp(raw, 4)
1115 self.assertEqual(bufio.write(b"abcdef"), 6)
1116 self.assertEqual(bufio.tell(), 6)
1117 bufio.seek(0, 0)
1118 self.assertEqual(bufio.write(b"XY"), 2)
1119 bufio.seek(6, 0)
1120 self.assertEqual(raw.getvalue(), b"XYcdef")
1121 self.assertEqual(bufio.write(b"123456"), 6)
1122 bufio.flush()
1123 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001125 def test_flush(self):
1126 writer = self.MockRawIO()
1127 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001128 bufio.write(b"abc")
1129 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001130 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 def test_destructor(self):
1133 writer = self.MockRawIO()
1134 bufio = self.tp(writer, 8)
1135 bufio.write(b"abc")
1136 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001137 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001138 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001139
1140 def test_truncate(self):
1141 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001142 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 bufio = self.tp(raw, 8)
1144 bufio.write(b"abcdef")
1145 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001146 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001147 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 self.assertEqual(f.read(), b"abc")
1149
Victor Stinner45df8202010-04-28 22:31:17 +00001150 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001151 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001152 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001153 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001154 # Write out many bytes from many threads and test they were
1155 # all flushed.
1156 N = 1000
1157 contents = bytes(range(256)) * N
1158 sizes = cycle([1, 19])
1159 n = 0
1160 queue = deque()
1161 while n < len(contents):
1162 size = next(sizes)
1163 queue.append(contents[n:n+size])
1164 n += size
1165 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001166 # We use a real file object because it allows us to
1167 # exercise situations where the GIL is released before
1168 # writing the buffer to the raw streams. This is in addition
1169 # to concurrency issues due to switching threads in the middle
1170 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001171 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001172 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001173 errors = []
1174 def f():
1175 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001176 while True:
1177 try:
1178 s = queue.popleft()
1179 except IndexError:
1180 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001181 bufio.write(s)
1182 except Exception as e:
1183 errors.append(e)
1184 raise
1185 threads = [threading.Thread(target=f) for x in range(20)]
1186 for t in threads:
1187 t.start()
1188 time.sleep(0.02) # yield
1189 for t in threads:
1190 t.join()
1191 self.assertFalse(errors,
1192 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001193 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001194 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001195 s = f.read()
1196 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001197 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001198 finally:
1199 support.unlink(support.TESTFN)
1200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201 def test_misbehaved_io(self):
1202 rawio = self.MisbehavedRawIO()
1203 bufio = self.tp(rawio, 5)
1204 self.assertRaises(IOError, bufio.seek, 0)
1205 self.assertRaises(IOError, bufio.tell)
1206 self.assertRaises(IOError, bufio.write, b"abcdef")
1207
Benjamin Peterson59406a92009-03-26 17:10:29 +00001208 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001209 with support.check_warnings(("max_buffer_size is deprecated",
1210 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001211 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001212
1213
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214class CBufferedWriterTest(BufferedWriterTest):
1215 tp = io.BufferedWriter
1216
1217 def test_constructor(self):
1218 BufferedWriterTest.test_constructor(self)
1219 # The allocation can succeed on 32-bit builds, e.g. with more
1220 # than 2GB RAM and a 64-bit kernel.
1221 if sys.maxsize > 0x7FFFFFFF:
1222 rawio = self.MockRawIO()
1223 bufio = self.tp(rawio)
1224 self.assertRaises((OverflowError, MemoryError, ValueError),
1225 bufio.__init__, rawio, sys.maxsize)
1226
1227 def test_initialization(self):
1228 rawio = self.MockRawIO()
1229 bufio = self.tp(rawio)
1230 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1231 self.assertRaises(ValueError, bufio.write, b"def")
1232 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1233 self.assertRaises(ValueError, bufio.write, b"def")
1234 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1235 self.assertRaises(ValueError, bufio.write, b"def")
1236
1237 def test_garbage_collection(self):
1238 # C BufferedWriter objects are collected, and collecting them flushes
1239 # all data to disk.
1240 # The Python version has __del__, so it ends into gc.garbage instead
1241 rawio = self.FileIO(support.TESTFN, "w+b")
1242 f = self.tp(rawio)
1243 f.write(b"123xxx")
1244 f.x = f
1245 wr = weakref.ref(f)
1246 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001247 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001248 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001249 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 self.assertEqual(f.read(), b"123xxx")
1251
1252
1253class PyBufferedWriterTest(BufferedWriterTest):
1254 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001255
Guido van Rossum01a27522007-03-07 01:00:12 +00001256class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001257
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001258 def test_constructor(self):
1259 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001260 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001261
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001262 def test_detach(self):
1263 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1264 self.assertRaises(self.UnsupportedOperation, pair.detach)
1265
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001266 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001267 with support.check_warnings(("max_buffer_size is deprecated",
1268 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001269 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001270
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001271 def test_constructor_with_not_readable(self):
1272 class NotReadable(MockRawIO):
1273 def readable(self):
1274 return False
1275
1276 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1277
1278 def test_constructor_with_not_writeable(self):
1279 class NotWriteable(MockRawIO):
1280 def writable(self):
1281 return False
1282
1283 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1284
1285 def test_read(self):
1286 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1287
1288 self.assertEqual(pair.read(3), b"abc")
1289 self.assertEqual(pair.read(1), b"d")
1290 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001291 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1292 self.assertEqual(pair.read(None), b"abc")
1293
1294 def test_readlines(self):
1295 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1296 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1297 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1298 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001299
1300 def test_read1(self):
1301 # .read1() is delegated to the underlying reader object, so this test
1302 # can be shallow.
1303 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1304
1305 self.assertEqual(pair.read1(3), b"abc")
1306
1307 def test_readinto(self):
1308 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1309
1310 data = bytearray(5)
1311 self.assertEqual(pair.readinto(data), 5)
1312 self.assertEqual(data, b"abcde")
1313
1314 def test_write(self):
1315 w = self.MockRawIO()
1316 pair = self.tp(self.MockRawIO(), w)
1317
1318 pair.write(b"abc")
1319 pair.flush()
1320 pair.write(b"def")
1321 pair.flush()
1322 self.assertEqual(w._write_stack, [b"abc", b"def"])
1323
1324 def test_peek(self):
1325 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1326
1327 self.assertTrue(pair.peek(3).startswith(b"abc"))
1328 self.assertEqual(pair.read(3), b"abc")
1329
1330 def test_readable(self):
1331 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1332 self.assertTrue(pair.readable())
1333
1334 def test_writeable(self):
1335 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1336 self.assertTrue(pair.writable())
1337
1338 def test_seekable(self):
1339 # BufferedRWPairs are never seekable, even if their readers and writers
1340 # are.
1341 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1342 self.assertFalse(pair.seekable())
1343
1344 # .flush() is delegated to the underlying writer object and has been
1345 # tested in the test_write method.
1346
1347 def test_close_and_closed(self):
1348 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1349 self.assertFalse(pair.closed)
1350 pair.close()
1351 self.assertTrue(pair.closed)
1352
1353 def test_isatty(self):
1354 class SelectableIsAtty(MockRawIO):
1355 def __init__(self, isatty):
1356 MockRawIO.__init__(self)
1357 self._isatty = isatty
1358
1359 def isatty(self):
1360 return self._isatty
1361
1362 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1363 self.assertFalse(pair.isatty())
1364
1365 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1366 self.assertTrue(pair.isatty())
1367
1368 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1369 self.assertTrue(pair.isatty())
1370
1371 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1372 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001373
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001374class CBufferedRWPairTest(BufferedRWPairTest):
1375 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001376
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001377class PyBufferedRWPairTest(BufferedRWPairTest):
1378 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001379
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001380
1381class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1382 read_mode = "rb+"
1383 write_mode = "wb+"
1384
1385 def test_constructor(self):
1386 BufferedReaderTest.test_constructor(self)
1387 BufferedWriterTest.test_constructor(self)
1388
1389 def test_read_and_write(self):
1390 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001391 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001392
1393 self.assertEqual(b"as", rw.read(2))
1394 rw.write(b"ddd")
1395 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001396 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001399
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 def test_seek_and_tell(self):
1401 raw = self.BytesIO(b"asdfghjkl")
1402 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001403
Ezio Melottib3aedd42010-11-20 19:04:17 +00001404 self.assertEqual(b"as", rw.read(2))
1405 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001406 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001407 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001408
Antoine Pitroue05565e2011-08-20 14:39:23 +02001409 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001410 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001411 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001413 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001414 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001415 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(7, rw.tell())
1417 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001418 rw.flush()
1419 self.assertEqual(b"asdf123fl", raw.getvalue())
1420
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001421 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def check_flush_and_read(self, read_func):
1424 raw = self.BytesIO(b"abcdefghi")
1425 bufio = self.tp(raw)
1426
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001429 self.assertEqual(b"ef", read_func(bufio, 2))
1430 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(6, bufio.tell())
1433 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 raw.seek(0, 0)
1435 raw.write(b"XYZ")
1436 # flush() resets the read buffer
1437 bufio.flush()
1438 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440
1441 def test_flush_and_read(self):
1442 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1443
1444 def test_flush_and_readinto(self):
1445 def _readinto(bufio, n=-1):
1446 b = bytearray(n if n >= 0 else 9999)
1447 n = bufio.readinto(b)
1448 return bytes(b[:n])
1449 self.check_flush_and_read(_readinto)
1450
1451 def test_flush_and_peek(self):
1452 def _peek(bufio, n=-1):
1453 # This relies on the fact that the buffer can contain the whole
1454 # raw stream, otherwise peek() can return less.
1455 b = bufio.peek(n)
1456 if n != -1:
1457 b = b[:n]
1458 bufio.seek(len(b), 1)
1459 return b
1460 self.check_flush_and_read(_peek)
1461
1462 def test_flush_and_write(self):
1463 raw = self.BytesIO(b"abcdefghi")
1464 bufio = self.tp(raw)
1465
1466 bufio.write(b"123")
1467 bufio.flush()
1468 bufio.write(b"45")
1469 bufio.flush()
1470 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001471 self.assertEqual(b"12345fghi", raw.getvalue())
1472 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001473
1474 def test_threads(self):
1475 BufferedReaderTest.test_threads(self)
1476 BufferedWriterTest.test_threads(self)
1477
1478 def test_writes_and_peek(self):
1479 def _peek(bufio):
1480 bufio.peek(1)
1481 self.check_writes(_peek)
1482 def _peek(bufio):
1483 pos = bufio.tell()
1484 bufio.seek(-1, 1)
1485 bufio.peek(1)
1486 bufio.seek(pos, 0)
1487 self.check_writes(_peek)
1488
1489 def test_writes_and_reads(self):
1490 def _read(bufio):
1491 bufio.seek(-1, 1)
1492 bufio.read(1)
1493 self.check_writes(_read)
1494
1495 def test_writes_and_read1s(self):
1496 def _read1(bufio):
1497 bufio.seek(-1, 1)
1498 bufio.read1(1)
1499 self.check_writes(_read1)
1500
1501 def test_writes_and_readintos(self):
1502 def _read(bufio):
1503 bufio.seek(-1, 1)
1504 bufio.readinto(bytearray(1))
1505 self.check_writes(_read)
1506
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001507 def test_write_after_readahead(self):
1508 # Issue #6629: writing after the buffer was filled by readahead should
1509 # first rewind the raw stream.
1510 for overwrite_size in [1, 5]:
1511 raw = self.BytesIO(b"A" * 10)
1512 bufio = self.tp(raw, 4)
1513 # Trigger readahead
1514 self.assertEqual(bufio.read(1), b"A")
1515 self.assertEqual(bufio.tell(), 1)
1516 # Overwriting should rewind the raw stream if it needs so
1517 bufio.write(b"B" * overwrite_size)
1518 self.assertEqual(bufio.tell(), overwrite_size + 1)
1519 # If the write size was smaller than the buffer size, flush() and
1520 # check that rewind happens.
1521 bufio.flush()
1522 self.assertEqual(bufio.tell(), overwrite_size + 1)
1523 s = raw.getvalue()
1524 self.assertEqual(s,
1525 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1526
Antoine Pitrou7c404892011-05-13 00:13:33 +02001527 def test_write_rewind_write(self):
1528 # Various combinations of reading / writing / seeking backwards / writing again
1529 def mutate(bufio, pos1, pos2):
1530 assert pos2 >= pos1
1531 # Fill the buffer
1532 bufio.seek(pos1)
1533 bufio.read(pos2 - pos1)
1534 bufio.write(b'\x02')
1535 # This writes earlier than the previous write, but still inside
1536 # the buffer.
1537 bufio.seek(pos1)
1538 bufio.write(b'\x01')
1539
1540 b = b"\x80\x81\x82\x83\x84"
1541 for i in range(0, len(b)):
1542 for j in range(i, len(b)):
1543 raw = self.BytesIO(b)
1544 bufio = self.tp(raw, 100)
1545 mutate(bufio, i, j)
1546 bufio.flush()
1547 expected = bytearray(b)
1548 expected[j] = 2
1549 expected[i] = 1
1550 self.assertEqual(raw.getvalue(), expected,
1551 "failed result for i=%d, j=%d" % (i, j))
1552
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001553 def test_truncate_after_read_or_write(self):
1554 raw = self.BytesIO(b"A" * 10)
1555 bufio = self.tp(raw, 100)
1556 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1557 self.assertEqual(bufio.truncate(), 2)
1558 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1559 self.assertEqual(bufio.truncate(), 4)
1560
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001561 def test_misbehaved_io(self):
1562 BufferedReaderTest.test_misbehaved_io(self)
1563 BufferedWriterTest.test_misbehaved_io(self)
1564
Antoine Pitroue05565e2011-08-20 14:39:23 +02001565 def test_interleaved_read_write(self):
1566 # Test for issue #12213
1567 with self.BytesIO(b'abcdefgh') as raw:
1568 with self.tp(raw, 100) as f:
1569 f.write(b"1")
1570 self.assertEqual(f.read(1), b'b')
1571 f.write(b'2')
1572 self.assertEqual(f.read1(1), b'd')
1573 f.write(b'3')
1574 buf = bytearray(1)
1575 f.readinto(buf)
1576 self.assertEqual(buf, b'f')
1577 f.write(b'4')
1578 self.assertEqual(f.peek(1), b'h')
1579 f.flush()
1580 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1581
1582 with self.BytesIO(b'abc') as raw:
1583 with self.tp(raw, 100) as f:
1584 self.assertEqual(f.read(1), b'a')
1585 f.write(b"2")
1586 self.assertEqual(f.read(1), b'c')
1587 f.flush()
1588 self.assertEqual(raw.getvalue(), b'a2c')
1589
1590 def test_interleaved_readline_write(self):
1591 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1592 with self.tp(raw) as f:
1593 f.write(b'1')
1594 self.assertEqual(f.readline(), b'b\n')
1595 f.write(b'2')
1596 self.assertEqual(f.readline(), b'def\n')
1597 f.write(b'3')
1598 self.assertEqual(f.readline(), b'\n')
1599 f.flush()
1600 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1601
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001602 # You can't construct a BufferedRandom over a non-seekable stream.
1603 test_unseekable = None
1604
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605class CBufferedRandomTest(BufferedRandomTest):
1606 tp = io.BufferedRandom
1607
1608 def test_constructor(self):
1609 BufferedRandomTest.test_constructor(self)
1610 # The allocation can succeed on 32-bit builds, e.g. with more
1611 # than 2GB RAM and a 64-bit kernel.
1612 if sys.maxsize > 0x7FFFFFFF:
1613 rawio = self.MockRawIO()
1614 bufio = self.tp(rawio)
1615 self.assertRaises((OverflowError, MemoryError, ValueError),
1616 bufio.__init__, rawio, sys.maxsize)
1617
1618 def test_garbage_collection(self):
1619 CBufferedReaderTest.test_garbage_collection(self)
1620 CBufferedWriterTest.test_garbage_collection(self)
1621
1622class PyBufferedRandomTest(BufferedRandomTest):
1623 tp = pyio.BufferedRandom
1624
1625
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001626# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1627# properties:
1628# - A single output character can correspond to many bytes of input.
1629# - The number of input bytes to complete the character can be
1630# undetermined until the last input byte is received.
1631# - The number of input bytes can vary depending on previous input.
1632# - A single input byte can correspond to many characters of output.
1633# - The number of output characters can be undetermined until the
1634# last input byte is received.
1635# - The number of output characters can vary depending on previous input.
1636
1637class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1638 """
1639 For testing seek/tell behavior with a stateful, buffering decoder.
1640
1641 Input is a sequence of words. Words may be fixed-length (length set
1642 by input) or variable-length (period-terminated). In variable-length
1643 mode, extra periods are ignored. Possible words are:
1644 - 'i' followed by a number sets the input length, I (maximum 99).
1645 When I is set to 0, words are space-terminated.
1646 - 'o' followed by a number sets the output length, O (maximum 99).
1647 - Any other word is converted into a word followed by a period on
1648 the output. The output word consists of the input word truncated
1649 or padded out with hyphens to make its length equal to O. If O
1650 is 0, the word is output verbatim without truncating or padding.
1651 I and O are initially set to 1. When I changes, any buffered input is
1652 re-scanned according to the new I. EOF also terminates the last word.
1653 """
1654
1655 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001656 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001657 self.reset()
1658
1659 def __repr__(self):
1660 return '<SID %x>' % id(self)
1661
1662 def reset(self):
1663 self.i = 1
1664 self.o = 1
1665 self.buffer = bytearray()
1666
1667 def getstate(self):
1668 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1669 return bytes(self.buffer), i*100 + o
1670
1671 def setstate(self, state):
1672 buffer, io = state
1673 self.buffer = bytearray(buffer)
1674 i, o = divmod(io, 100)
1675 self.i, self.o = i ^ 1, o ^ 1
1676
1677 def decode(self, input, final=False):
1678 output = ''
1679 for b in input:
1680 if self.i == 0: # variable-length, terminated with period
1681 if b == ord('.'):
1682 if self.buffer:
1683 output += self.process_word()
1684 else:
1685 self.buffer.append(b)
1686 else: # fixed-length, terminate after self.i bytes
1687 self.buffer.append(b)
1688 if len(self.buffer) == self.i:
1689 output += self.process_word()
1690 if final and self.buffer: # EOF terminates the last word
1691 output += self.process_word()
1692 return output
1693
1694 def process_word(self):
1695 output = ''
1696 if self.buffer[0] == ord('i'):
1697 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1698 elif self.buffer[0] == ord('o'):
1699 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1700 else:
1701 output = self.buffer.decode('ascii')
1702 if len(output) < self.o:
1703 output += '-'*self.o # pad out with hyphens
1704 if self.o:
1705 output = output[:self.o] # truncate to output length
1706 output += '.'
1707 self.buffer = bytearray()
1708 return output
1709
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001710 codecEnabled = False
1711
1712 @classmethod
1713 def lookupTestDecoder(cls, name):
1714 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001715 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001716 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001717 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001718 incrementalencoder=None,
1719 streamreader=None, streamwriter=None,
1720 incrementaldecoder=cls)
1721
1722# Register the previous decoder for testing.
1723# Disabled by default, tests will enable it.
1724codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1725
1726
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001727class StatefulIncrementalDecoderTest(unittest.TestCase):
1728 """
1729 Make sure the StatefulIncrementalDecoder actually works.
1730 """
1731
1732 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001733 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001734 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001735 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001736 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001737 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001738 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001739 # I=0, O=6 (variable-length input, fixed-length output)
1740 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1741 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001742 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001743 # I=6, O=3 (fixed-length input > fixed-length output)
1744 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1745 # I=0, then 3; O=29, then 15 (with longer output)
1746 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1747 'a----------------------------.' +
1748 'b----------------------------.' +
1749 'cde--------------------------.' +
1750 'abcdefghijabcde.' +
1751 'a.b------------.' +
1752 '.c.------------.' +
1753 'd.e------------.' +
1754 'k--------------.' +
1755 'l--------------.' +
1756 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001757 ]
1758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001759 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001760 # Try a few one-shot test cases.
1761 for input, eof, output in self.test_cases:
1762 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001763 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001764
1765 # Also test an unfinished decode, followed by forcing EOF.
1766 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001767 self.assertEqual(d.decode(b'oiabcd'), '')
1768 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001769
1770class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001771
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001772 def setUp(self):
1773 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1774 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001775 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001776
Guido van Rossumd0712812007-04-11 16:32:43 +00001777 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001778 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 def test_constructor(self):
1781 r = self.BytesIO(b"\xc3\xa9\n\n")
1782 b = self.BufferedReader(r, 1000)
1783 t = self.TextIOWrapper(b)
1784 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001785 self.assertEqual(t.encoding, "latin1")
1786 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001788 self.assertEqual(t.encoding, "utf8")
1789 self.assertEqual(t.line_buffering, True)
1790 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 self.assertRaises(TypeError, t.__init__, b, newline=42)
1792 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1793
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001794 def test_detach(self):
1795 r = self.BytesIO()
1796 b = self.BufferedWriter(r)
1797 t = self.TextIOWrapper(b)
1798 self.assertIs(t.detach(), b)
1799
1800 t = self.TextIOWrapper(b, encoding="ascii")
1801 t.write("howdy")
1802 self.assertFalse(r.getvalue())
1803 t.detach()
1804 self.assertEqual(r.getvalue(), b"howdy")
1805 self.assertRaises(ValueError, t.detach)
1806
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001807 def test_repr(self):
1808 raw = self.BytesIO("hello".encode("utf-8"))
1809 b = self.BufferedReader(raw)
1810 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001811 modname = self.TextIOWrapper.__module__
1812 self.assertEqual(repr(t),
1813 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1814 raw.name = "dummy"
1815 self.assertEqual(repr(t),
1816 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001817 t.mode = "r"
1818 self.assertEqual(repr(t),
1819 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001820 raw.name = b"dummy"
1821 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001822 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 def test_line_buffering(self):
1825 r = self.BytesIO()
1826 b = self.BufferedWriter(r, 1000)
1827 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001828 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001829 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001830 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001831 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001832 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 def test_encoding(self):
1836 # Check the encoding attribute is always set, and valid
1837 b = self.BytesIO()
1838 t = self.TextIOWrapper(b, encoding="utf8")
1839 self.assertEqual(t.encoding, "utf8")
1840 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001841 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 codecs.lookup(t.encoding)
1843
1844 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001845 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001846 b = self.BytesIO(b"abc\n\xff\n")
1847 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001848 self.assertRaises(UnicodeError, t.read)
1849 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850 b = self.BytesIO(b"abc\n\xff\n")
1851 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001852 self.assertRaises(UnicodeError, t.read)
1853 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854 b = self.BytesIO(b"abc\n\xff\n")
1855 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001856 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001857 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858 b = self.BytesIO(b"abc\n\xff\n")
1859 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001860 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001862 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001863 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 b = self.BytesIO()
1865 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001866 self.assertRaises(UnicodeError, t.write, "\xff")
1867 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 b = self.BytesIO()
1869 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001870 self.assertRaises(UnicodeError, t.write, "\xff")
1871 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 b = self.BytesIO()
1873 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001874 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001875 t.write("abc\xffdef\n")
1876 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001877 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001878 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 b = self.BytesIO()
1880 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001881 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001882 t.write("abc\xffdef\n")
1883 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001884 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001885
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001887 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1888
1889 tests = [
1890 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001891 [ '', input_lines ],
1892 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1893 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1894 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001895 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001896 encodings = (
1897 'utf-8', 'latin-1',
1898 'utf-16', 'utf-16-le', 'utf-16-be',
1899 'utf-32', 'utf-32-le', 'utf-32-be',
1900 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001901
Guido van Rossum8358db22007-08-18 21:39:55 +00001902 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001903 # character in TextIOWrapper._pending_line.
1904 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001905 # XXX: str.encode() should return bytes
1906 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001907 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001908 for bufsize in range(1, 10):
1909 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1911 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001912 encoding=encoding)
1913 if do_reads:
1914 got_lines = []
1915 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001916 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001917 if c2 == '':
1918 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001919 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001920 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001921 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001922 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001923
1924 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001925 self.assertEqual(got_line, exp_line)
1926 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 def test_newlines_input(self):
1929 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001930 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1931 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001932 (None, normalized.decode("ascii").splitlines(True)),
1933 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1935 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1936 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001937 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001938 buf = self.BytesIO(testdata)
1939 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001940 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001941 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001942 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001943
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 def test_newlines_output(self):
1945 testdict = {
1946 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1947 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1948 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1949 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1950 }
1951 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1952 for newline, expected in tests:
1953 buf = self.BytesIO()
1954 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1955 txt.write("AAA\nB")
1956 txt.write("BB\nCCC\n")
1957 txt.write("X\rY\r\nZ")
1958 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001959 self.assertEqual(buf.closed, False)
1960 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961
1962 def test_destructor(self):
1963 l = []
1964 base = self.BytesIO
1965 class MyBytesIO(base):
1966 def close(self):
1967 l.append(self.getvalue())
1968 base.close(self)
1969 b = MyBytesIO()
1970 t = self.TextIOWrapper(b, encoding="ascii")
1971 t.write("abc")
1972 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001973 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975
1976 def test_override_destructor(self):
1977 record = []
1978 class MyTextIO(self.TextIOWrapper):
1979 def __del__(self):
1980 record.append(1)
1981 try:
1982 f = super().__del__
1983 except AttributeError:
1984 pass
1985 else:
1986 f()
1987 def close(self):
1988 record.append(2)
1989 super().close()
1990 def flush(self):
1991 record.append(3)
1992 super().flush()
1993 b = self.BytesIO()
1994 t = MyTextIO(b, encoding="ascii")
1995 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001996 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001997 self.assertEqual(record, [1, 2, 3])
1998
1999 def test_error_through_destructor(self):
2000 # Test that the exception state is not modified by a destructor,
2001 # even if close() fails.
2002 rawio = self.CloseFailureIO()
2003 def f():
2004 self.TextIOWrapper(rawio).xyzzy
2005 with support.captured_output("stderr") as s:
2006 self.assertRaises(AttributeError, f)
2007 s = s.getvalue().strip()
2008 if s:
2009 # The destructor *may* have printed an unraisable error, check it
2010 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002011 self.assertTrue(s.startswith("Exception IOError: "), s)
2012 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002013
Guido van Rossum9b76da62007-04-11 01:09:03 +00002014 # Systematic tests of the text I/O API
2015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002017 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2018 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002020 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002021 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002022 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002023 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002024 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002025 self.assertEqual(f.tell(), 0)
2026 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002027 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002028 self.assertEqual(f.seek(0), 0)
2029 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002030 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002031 self.assertEqual(f.read(2), "ab")
2032 self.assertEqual(f.read(1), "c")
2033 self.assertEqual(f.read(1), "")
2034 self.assertEqual(f.read(), "")
2035 self.assertEqual(f.tell(), cookie)
2036 self.assertEqual(f.seek(0), 0)
2037 self.assertEqual(f.seek(0, 2), cookie)
2038 self.assertEqual(f.write("def"), 3)
2039 self.assertEqual(f.seek(cookie), cookie)
2040 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002041 if enc.startswith("utf"):
2042 self.multi_line_test(f, enc)
2043 f.close()
2044
2045 def multi_line_test(self, f, enc):
2046 f.seek(0)
2047 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002048 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002049 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002050 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002051 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002052 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002053 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002054 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002055 wlines.append((f.tell(), line))
2056 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002057 f.seek(0)
2058 rlines = []
2059 while True:
2060 pos = f.tell()
2061 line = f.readline()
2062 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002063 break
2064 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002065 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002067 def test_telling(self):
2068 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002069 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002070 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002071 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002072 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002073 p2 = f.tell()
2074 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002075 self.assertEqual(f.tell(), p0)
2076 self.assertEqual(f.readline(), "\xff\n")
2077 self.assertEqual(f.tell(), p1)
2078 self.assertEqual(f.readline(), "\xff\n")
2079 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002080 f.seek(0)
2081 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002082 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002083 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002084 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002085 f.close()
2086
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 def test_seeking(self):
2088 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002089 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002090 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002091 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002092 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002093 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002094 suffix = bytes(u_suffix.encode("utf-8"))
2095 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002096 with self.open(support.TESTFN, "wb") as f:
2097 f.write(line*2)
2098 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2099 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002100 self.assertEqual(s, str(prefix, "ascii"))
2101 self.assertEqual(f.tell(), prefix_size)
2102 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002103
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002104 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002105 # Regression test for a specific bug
2106 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002107 with self.open(support.TESTFN, "wb") as f:
2108 f.write(data)
2109 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2110 f._CHUNK_SIZE # Just test that it exists
2111 f._CHUNK_SIZE = 2
2112 f.readline()
2113 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002114
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002115 def test_seek_and_tell(self):
2116 #Test seek/tell using the StatefulIncrementalDecoder.
2117 # Make test faster by doing smaller seeks
2118 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002119
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002120 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002121 """Tell/seek to various points within a data stream and ensure
2122 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002123 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002124 f.write(data)
2125 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002126 f = self.open(support.TESTFN, encoding='test_decoder')
2127 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002128 decoded = f.read()
2129 f.close()
2130
Neal Norwitze2b07052008-03-18 19:52:05 +00002131 for i in range(min_pos, len(decoded) + 1): # seek positions
2132 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002134 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002135 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002136 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002137 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002138 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002139 f.close()
2140
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002141 # Enable the test decoder.
2142 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002143
2144 # Run the tests.
2145 try:
2146 # Try each test case.
2147 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002148 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002149
2150 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002151 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2152 offset = CHUNK_SIZE - len(input)//2
2153 prefix = b'.'*offset
2154 # Don't bother seeking into the prefix (takes too long).
2155 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002156 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002157
2158 # Ensure our test decoder won't interfere with subsequent tests.
2159 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002160 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002163 data = "1234567890"
2164 tests = ("utf-16",
2165 "utf-16-le",
2166 "utf-16-be",
2167 "utf-32",
2168 "utf-32-le",
2169 "utf-32-be")
2170 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 buf = self.BytesIO()
2172 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002173 # Check if the BOM is written only once (see issue1753).
2174 f.write(data)
2175 f.write(data)
2176 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002177 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002178 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002179 self.assertEqual(f.read(), data * 2)
2180 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002181
Benjamin Petersona1b49012009-03-31 23:11:32 +00002182 def test_unreadable(self):
2183 class UnReadable(self.BytesIO):
2184 def readable(self):
2185 return False
2186 txt = self.TextIOWrapper(UnReadable())
2187 self.assertRaises(IOError, txt.read)
2188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002189 def test_read_one_by_one(self):
2190 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002191 reads = ""
2192 while True:
2193 c = txt.read(1)
2194 if not c:
2195 break
2196 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002197 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002198
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002199 def test_readlines(self):
2200 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2201 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2202 txt.seek(0)
2203 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2204 txt.seek(0)
2205 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2206
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002207 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002209 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002211 reads = ""
2212 while True:
2213 c = txt.read(128)
2214 if not c:
2215 break
2216 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002217 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002218
2219 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002221
2222 # read one char at a time
2223 reads = ""
2224 while True:
2225 c = txt.read(1)
2226 if not c:
2227 break
2228 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002230
2231 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002232 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002233 txt._CHUNK_SIZE = 4
2234
2235 reads = ""
2236 while True:
2237 c = txt.read(4)
2238 if not c:
2239 break
2240 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002242
2243 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002245 txt._CHUNK_SIZE = 4
2246
2247 reads = txt.read(4)
2248 reads += txt.read(4)
2249 reads += txt.readline()
2250 reads += txt.readline()
2251 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002252 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002253
2254 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002256 txt._CHUNK_SIZE = 4
2257
2258 reads = txt.read(4)
2259 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002260 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002261
2262 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002263 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002264 txt._CHUNK_SIZE = 4
2265
2266 reads = txt.read(4)
2267 pos = txt.tell()
2268 txt.seek(0)
2269 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002270 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002271
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002272 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273 buffer = self.BytesIO(self.testdata)
2274 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002275
2276 self.assertEqual(buffer.seekable(), txt.seekable())
2277
Antoine Pitroue4501852009-05-14 18:55:55 +00002278 def test_append_bom(self):
2279 # The BOM is not written again when appending to a non-empty file
2280 filename = support.TESTFN
2281 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2282 with self.open(filename, 'w', encoding=charset) as f:
2283 f.write('aaa')
2284 pos = f.tell()
2285 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002286 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002287
2288 with self.open(filename, 'a', encoding=charset) as f:
2289 f.write('xxx')
2290 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002291 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002292
2293 def test_seek_bom(self):
2294 # Same test, but when seeking manually
2295 filename = support.TESTFN
2296 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2297 with self.open(filename, 'w', encoding=charset) as f:
2298 f.write('aaa')
2299 pos = f.tell()
2300 with self.open(filename, 'r+', encoding=charset) as f:
2301 f.seek(pos)
2302 f.write('zzz')
2303 f.seek(0)
2304 f.write('bbb')
2305 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002306 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002307
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002308 def test_errors_property(self):
2309 with self.open(support.TESTFN, "w") as f:
2310 self.assertEqual(f.errors, "strict")
2311 with self.open(support.TESTFN, "w", errors="replace") as f:
2312 self.assertEqual(f.errors, "replace")
2313
Victor Stinner45df8202010-04-28 22:31:17 +00002314 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002315 def test_threads_write(self):
2316 # Issue6750: concurrent writes could duplicate data
2317 event = threading.Event()
2318 with self.open(support.TESTFN, "w", buffering=1) as f:
2319 def run(n):
2320 text = "Thread%03d\n" % n
2321 event.wait()
2322 f.write(text)
2323 threads = [threading.Thread(target=lambda n=x: run(n))
2324 for x in range(20)]
2325 for t in threads:
2326 t.start()
2327 time.sleep(0.02)
2328 event.set()
2329 for t in threads:
2330 t.join()
2331 with self.open(support.TESTFN) as f:
2332 content = f.read()
2333 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002334 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002335
Antoine Pitrou6be88762010-05-03 16:48:20 +00002336 def test_flush_error_on_close(self):
2337 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2338 def bad_flush():
2339 raise IOError()
2340 txt.flush = bad_flush
2341 self.assertRaises(IOError, txt.close) # exception not swallowed
2342
2343 def test_multi_close(self):
2344 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2345 txt.close()
2346 txt.close()
2347 txt.close()
2348 self.assertRaises(ValueError, txt.flush)
2349
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002350 def test_unseekable(self):
2351 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2352 self.assertRaises(self.UnsupportedOperation, txt.tell)
2353 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2354
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002355 def test_readonly_attributes(self):
2356 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2357 buf = self.BytesIO(self.testdata)
2358 with self.assertRaises(AttributeError):
2359 txt.buffer = buf
2360
Antoine Pitroue96ec682011-07-23 21:46:35 +02002361 def test_rawio(self):
2362 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2363 # that subprocess.Popen() can have the required unbuffered
2364 # semantics with universal_newlines=True.
2365 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2366 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2367 # Reads
2368 self.assertEqual(txt.read(4), 'abcd')
2369 self.assertEqual(txt.readline(), 'efghi\n')
2370 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2371
2372 def test_rawio_write_through(self):
2373 # Issue #12591: with write_through=True, writes don't need a flush
2374 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2375 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2376 write_through=True)
2377 txt.write('1')
2378 txt.write('23\n4')
2379 txt.write('5')
2380 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2381
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002382class CTextIOWrapperTest(TextIOWrapperTest):
2383
2384 def test_initialization(self):
2385 r = self.BytesIO(b"\xc3\xa9\n\n")
2386 b = self.BufferedReader(r, 1000)
2387 t = self.TextIOWrapper(b)
2388 self.assertRaises(TypeError, t.__init__, b, newline=42)
2389 self.assertRaises(ValueError, t.read)
2390 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2391 self.assertRaises(ValueError, t.read)
2392
2393 def test_garbage_collection(self):
2394 # C TextIOWrapper objects are collected, and collecting them flushes
2395 # all data to disk.
2396 # The Python version has __del__, so it ends in gc.garbage instead.
2397 rawio = io.FileIO(support.TESTFN, "wb")
2398 b = self.BufferedWriter(rawio)
2399 t = self.TextIOWrapper(b, encoding="ascii")
2400 t.write("456def")
2401 t.x = t
2402 wr = weakref.ref(t)
2403 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002404 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002405 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002406 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 self.assertEqual(f.read(), b"456def")
2408
2409class PyTextIOWrapperTest(TextIOWrapperTest):
2410 pass
2411
2412
2413class IncrementalNewlineDecoderTest(unittest.TestCase):
2414
2415 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002416 # UTF-8 specific tests for a newline decoder
2417 def _check_decode(b, s, **kwargs):
2418 # We exercise getstate() / setstate() as well as decode()
2419 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002420 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002421 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002422 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002423
Antoine Pitrou180a3362008-12-14 16:36:46 +00002424 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002425
Antoine Pitrou180a3362008-12-14 16:36:46 +00002426 _check_decode(b'\xe8', "")
2427 _check_decode(b'\xa2', "")
2428 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002429
Antoine Pitrou180a3362008-12-14 16:36:46 +00002430 _check_decode(b'\xe8', "")
2431 _check_decode(b'\xa2', "")
2432 _check_decode(b'\x88', "\u8888")
2433
2434 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002435 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2436
Antoine Pitrou180a3362008-12-14 16:36:46 +00002437 decoder.reset()
2438 _check_decode(b'\n', "\n")
2439 _check_decode(b'\r', "")
2440 _check_decode(b'', "\n", final=True)
2441 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002442
Antoine Pitrou180a3362008-12-14 16:36:46 +00002443 _check_decode(b'\r', "")
2444 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002445
Antoine Pitrou180a3362008-12-14 16:36:46 +00002446 _check_decode(b'\r\r\n', "\n\n")
2447 _check_decode(b'\r', "")
2448 _check_decode(b'\r', "\n")
2449 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002450
Antoine Pitrou180a3362008-12-14 16:36:46 +00002451 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2452 _check_decode(b'\xe8\xa2\x88', "\u8888")
2453 _check_decode(b'\n', "\n")
2454 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2455 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002457 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002458 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002459 if encoding is not None:
2460 encoder = codecs.getincrementalencoder(encoding)()
2461 def _decode_bytewise(s):
2462 # Decode one byte at a time
2463 for b in encoder.encode(s):
2464 result.append(decoder.decode(bytes([b])))
2465 else:
2466 encoder = None
2467 def _decode_bytewise(s):
2468 # Decode one char at a time
2469 for c in s:
2470 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002471 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002472 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002473 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002474 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002476 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002477 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002478 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002479 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002480 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002481 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002482 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002483 input = "abc"
2484 if encoder is not None:
2485 encoder.reset()
2486 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002487 self.assertEqual(decoder.decode(input), "abc")
2488 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489
2490 def test_newline_decoder(self):
2491 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 # None meaning the IncrementalNewlineDecoder takes unicode input
2493 # rather than bytes input
2494 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002495 'utf-16', 'utf-16-le', 'utf-16-be',
2496 'utf-32', 'utf-32-le', 'utf-32-be',
2497 )
2498 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002499 decoder = enc and codecs.getincrementaldecoder(enc)()
2500 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2501 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002502 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002503 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2504 self.check_newline_decoding_utf8(decoder)
2505
Antoine Pitrou66913e22009-03-06 23:40:56 +00002506 def test_newline_bytes(self):
2507 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2508 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002509 self.assertEqual(dec.newlines, None)
2510 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2511 self.assertEqual(dec.newlines, None)
2512 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2513 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002514 dec = self.IncrementalNewlineDecoder(None, translate=False)
2515 _check(dec)
2516 dec = self.IncrementalNewlineDecoder(None, translate=True)
2517 _check(dec)
2518
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002519class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2520 pass
2521
2522class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2523 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002524
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002525
Guido van Rossum01a27522007-03-07 01:00:12 +00002526# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002527
Guido van Rossum5abbf752007-08-27 17:39:33 +00002528class MiscIOTest(unittest.TestCase):
2529
Barry Warsaw40e82462008-11-20 20:14:50 +00002530 def tearDown(self):
2531 support.unlink(support.TESTFN)
2532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 def test___all__(self):
2534 for name in self.io.__all__:
2535 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002536 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002537 if name == "open":
2538 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002539 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002540 self.assertTrue(issubclass(obj, Exception), name)
2541 elif not name.startswith("SEEK_"):
2542 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002543
Barry Warsaw40e82462008-11-20 20:14:50 +00002544 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002546 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002547 f.close()
2548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002549 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002550 self.assertEqual(f.name, support.TESTFN)
2551 self.assertEqual(f.buffer.name, support.TESTFN)
2552 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2553 self.assertEqual(f.mode, "U")
2554 self.assertEqual(f.buffer.mode, "rb")
2555 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002556 f.close()
2557
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002558 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002559 self.assertEqual(f.mode, "w+")
2560 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2561 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002564 self.assertEqual(g.mode, "wb")
2565 self.assertEqual(g.raw.mode, "wb")
2566 self.assertEqual(g.name, f.fileno())
2567 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002568 f.close()
2569 g.close()
2570
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002571 def test_io_after_close(self):
2572 for kwargs in [
2573 {"mode": "w"},
2574 {"mode": "wb"},
2575 {"mode": "w", "buffering": 1},
2576 {"mode": "w", "buffering": 2},
2577 {"mode": "wb", "buffering": 0},
2578 {"mode": "r"},
2579 {"mode": "rb"},
2580 {"mode": "r", "buffering": 1},
2581 {"mode": "r", "buffering": 2},
2582 {"mode": "rb", "buffering": 0},
2583 {"mode": "w+"},
2584 {"mode": "w+b"},
2585 {"mode": "w+", "buffering": 1},
2586 {"mode": "w+", "buffering": 2},
2587 {"mode": "w+b", "buffering": 0},
2588 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002590 f.close()
2591 self.assertRaises(ValueError, f.flush)
2592 self.assertRaises(ValueError, f.fileno)
2593 self.assertRaises(ValueError, f.isatty)
2594 self.assertRaises(ValueError, f.__iter__)
2595 if hasattr(f, "peek"):
2596 self.assertRaises(ValueError, f.peek, 1)
2597 self.assertRaises(ValueError, f.read)
2598 if hasattr(f, "read1"):
2599 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002600 if hasattr(f, "readall"):
2601 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002602 if hasattr(f, "readinto"):
2603 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2604 self.assertRaises(ValueError, f.readline)
2605 self.assertRaises(ValueError, f.readlines)
2606 self.assertRaises(ValueError, f.seek, 0)
2607 self.assertRaises(ValueError, f.tell)
2608 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002609 self.assertRaises(ValueError, f.write,
2610 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002611 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002614 def test_blockingioerror(self):
2615 # Various BlockingIOError issues
2616 self.assertRaises(TypeError, self.BlockingIOError)
2617 self.assertRaises(TypeError, self.BlockingIOError, 1)
2618 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2619 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2620 b = self.BlockingIOError(1, "")
2621 self.assertEqual(b.characters_written, 0)
2622 class C(str):
2623 pass
2624 c = C("")
2625 b = self.BlockingIOError(1, c)
2626 c.b = b
2627 b.c = c
2628 wr = weakref.ref(c)
2629 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002630 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002631 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002632
2633 def test_abcs(self):
2634 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002635 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2636 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2637 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2638 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002639
2640 def _check_abc_inheritance(self, abcmodule):
2641 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002642 self.assertIsInstance(f, abcmodule.IOBase)
2643 self.assertIsInstance(f, abcmodule.RawIOBase)
2644 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2645 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002646 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002647 self.assertIsInstance(f, abcmodule.IOBase)
2648 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2649 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2650 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002651 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002652 self.assertIsInstance(f, abcmodule.IOBase)
2653 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2654 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2655 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002656
2657 def test_abc_inheritance(self):
2658 # Test implementations inherit from their respective ABCs
2659 self._check_abc_inheritance(self)
2660
2661 def test_abc_inheritance_official(self):
2662 # Test implementations inherit from the official ABCs of the
2663 # baseline "io" module.
2664 self._check_abc_inheritance(io)
2665
Antoine Pitroue033e062010-10-29 10:38:18 +00002666 def _check_warn_on_dealloc(self, *args, **kwargs):
2667 f = open(*args, **kwargs)
2668 r = repr(f)
2669 with self.assertWarns(ResourceWarning) as cm:
2670 f = None
2671 support.gc_collect()
2672 self.assertIn(r, str(cm.warning.args[0]))
2673
2674 def test_warn_on_dealloc(self):
2675 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2676 self._check_warn_on_dealloc(support.TESTFN, "wb")
2677 self._check_warn_on_dealloc(support.TESTFN, "w")
2678
2679 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2680 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002681 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002682 for fd in fds:
2683 try:
2684 os.close(fd)
2685 except EnvironmentError as e:
2686 if e.errno != errno.EBADF:
2687 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002688 self.addCleanup(cleanup_fds)
2689 r, w = os.pipe()
2690 fds += r, w
2691 self._check_warn_on_dealloc(r, *args, **kwargs)
2692 # When using closefd=False, there's no warning
2693 r, w = os.pipe()
2694 fds += r, w
2695 with warnings.catch_warnings(record=True) as recorded:
2696 open(r, *args, closefd=False, **kwargs)
2697 support.gc_collect()
2698 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002699
2700 def test_warn_on_dealloc_fd(self):
2701 self._check_warn_on_dealloc_fd("rb", buffering=0)
2702 self._check_warn_on_dealloc_fd("rb")
2703 self._check_warn_on_dealloc_fd("r")
2704
2705
Antoine Pitrou243757e2010-11-05 21:15:39 +00002706 def test_pickling(self):
2707 # Pickling file objects is forbidden
2708 for kwargs in [
2709 {"mode": "w"},
2710 {"mode": "wb"},
2711 {"mode": "wb", "buffering": 0},
2712 {"mode": "r"},
2713 {"mode": "rb"},
2714 {"mode": "rb", "buffering": 0},
2715 {"mode": "w+"},
2716 {"mode": "w+b"},
2717 {"mode": "w+b", "buffering": 0},
2718 ]:
2719 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2720 with self.open(support.TESTFN, **kwargs) as f:
2721 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723class CMiscIOTest(MiscIOTest):
2724 io = io
2725
2726class PyMiscIOTest(MiscIOTest):
2727 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002728
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002729
2730@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2731class SignalsTest(unittest.TestCase):
2732
2733 def setUp(self):
2734 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2735
2736 def tearDown(self):
2737 signal.signal(signal.SIGALRM, self.oldalrm)
2738
2739 def alarm_interrupt(self, sig, frame):
2740 1/0
2741
2742 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002743 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2744 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002745 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2746 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002747 invokes the signal handler, and bubbles up the exception raised
2748 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002749 read_results = []
2750 def _read():
2751 s = os.read(r, 1)
2752 read_results.append(s)
2753 t = threading.Thread(target=_read)
2754 t.daemon = True
2755 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002756 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002757 try:
2758 wio = self.io.open(w, **fdopen_kwargs)
2759 t.start()
2760 signal.alarm(1)
2761 # Fill the pipe enough that the write will be blocking.
2762 # It will be interrupted by the timer armed above. Since the
2763 # other thread has read one byte, the low-level write will
2764 # return with a successful (partial) result rather than an EINTR.
2765 # The buffered IO layer must check for pending signal
2766 # handlers, which in this case will invoke alarm_interrupt().
2767 self.assertRaises(ZeroDivisionError,
2768 wio.write, item * (1024 * 1024))
2769 t.join()
2770 # We got one byte, get another one and check that it isn't a
2771 # repeat of the first one.
2772 read_results.append(os.read(r, 1))
2773 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2774 finally:
2775 os.close(w)
2776 os.close(r)
2777 # This is deliberate. If we didn't close the file descriptor
2778 # before closing wio, wio would try to flush its internal
2779 # buffer, and block again.
2780 try:
2781 wio.close()
2782 except IOError as e:
2783 if e.errno != errno.EBADF:
2784 raise
2785
2786 def test_interrupted_write_unbuffered(self):
2787 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2788
2789 def test_interrupted_write_buffered(self):
2790 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2791
2792 def test_interrupted_write_text(self):
2793 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2794
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002795 def check_reentrant_write(self, data, **fdopen_kwargs):
2796 def on_alarm(*args):
2797 # Will be called reentrantly from the same thread
2798 wio.write(data)
2799 1/0
2800 signal.signal(signal.SIGALRM, on_alarm)
2801 r, w = os.pipe()
2802 wio = self.io.open(w, **fdopen_kwargs)
2803 try:
2804 signal.alarm(1)
2805 # Either the reentrant call to wio.write() fails with RuntimeError,
2806 # or the signal handler raises ZeroDivisionError.
2807 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2808 while 1:
2809 for i in range(100):
2810 wio.write(data)
2811 wio.flush()
2812 # Make sure the buffer doesn't fill up and block further writes
2813 os.read(r, len(data) * 100)
2814 exc = cm.exception
2815 if isinstance(exc, RuntimeError):
2816 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2817 finally:
2818 wio.close()
2819 os.close(r)
2820
2821 def test_reentrant_write_buffered(self):
2822 self.check_reentrant_write(b"xy", mode="wb")
2823
2824 def test_reentrant_write_text(self):
2825 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2826
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002827 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2828 """Check that a buffered read, when it gets interrupted (either
2829 returning a partial result or EINTR), properly invokes the signal
2830 handler and retries if the latter returned successfully."""
2831 r, w = os.pipe()
2832 fdopen_kwargs["closefd"] = False
2833 def alarm_handler(sig, frame):
2834 os.write(w, b"bar")
2835 signal.signal(signal.SIGALRM, alarm_handler)
2836 try:
2837 rio = self.io.open(r, **fdopen_kwargs)
2838 os.write(w, b"foo")
2839 signal.alarm(1)
2840 # Expected behaviour:
2841 # - first raw read() returns partial b"foo"
2842 # - second raw read() returns EINTR
2843 # - third raw read() returns b"bar"
2844 self.assertEqual(decode(rio.read(6)), "foobar")
2845 finally:
2846 rio.close()
2847 os.close(w)
2848 os.close(r)
2849
Antoine Pitrou20db5112011-08-19 20:32:34 +02002850 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002851 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2852 mode="rb")
2853
Antoine Pitrou20db5112011-08-19 20:32:34 +02002854 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002855 self.check_interrupted_read_retry(lambda x: x,
2856 mode="r")
2857
2858 @unittest.skipUnless(threading, 'Threading required for this test.')
2859 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2860 """Check that a buffered write, when it gets interrupted (either
2861 returning a partial result or EINTR), properly invokes the signal
2862 handler and retries if the latter returned successfully."""
2863 select = support.import_module("select")
2864 # A quantity that exceeds the buffer size of an anonymous pipe's
2865 # write end.
2866 N = 1024 * 1024
2867 r, w = os.pipe()
2868 fdopen_kwargs["closefd"] = False
2869 # We need a separate thread to read from the pipe and allow the
2870 # write() to finish. This thread is started after the SIGALRM is
2871 # received (forcing a first EINTR in write()).
2872 read_results = []
2873 write_finished = False
2874 def _read():
2875 while not write_finished:
2876 while r in select.select([r], [], [], 1.0)[0]:
2877 s = os.read(r, 1024)
2878 read_results.append(s)
2879 t = threading.Thread(target=_read)
2880 t.daemon = True
2881 def alarm1(sig, frame):
2882 signal.signal(signal.SIGALRM, alarm2)
2883 signal.alarm(1)
2884 def alarm2(sig, frame):
2885 t.start()
2886 signal.signal(signal.SIGALRM, alarm1)
2887 try:
2888 wio = self.io.open(w, **fdopen_kwargs)
2889 signal.alarm(1)
2890 # Expected behaviour:
2891 # - first raw write() is partial (because of the limited pipe buffer
2892 # and the first alarm)
2893 # - second raw write() returns EINTR (because of the second alarm)
2894 # - subsequent write()s are successful (either partial or complete)
2895 self.assertEqual(N, wio.write(item * N))
2896 wio.flush()
2897 write_finished = True
2898 t.join()
2899 self.assertEqual(N, sum(len(x) for x in read_results))
2900 finally:
2901 write_finished = True
2902 os.close(w)
2903 os.close(r)
2904 # This is deliberate. If we didn't close the file descriptor
2905 # before closing wio, wio would try to flush its internal
2906 # buffer, and could block (in case of failure).
2907 try:
2908 wio.close()
2909 except IOError as e:
2910 if e.errno != errno.EBADF:
2911 raise
2912
Antoine Pitrou20db5112011-08-19 20:32:34 +02002913 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002914 self.check_interrupted_write_retry(b"x", mode="wb")
2915
Antoine Pitrou20db5112011-08-19 20:32:34 +02002916 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002917 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2918
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002919
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002920class CSignalsTest(SignalsTest):
2921 io = io
2922
2923class PySignalsTest(SignalsTest):
2924 io = pyio
2925
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002926 # Handling reentrancy issues would slow down _pyio even more, so the
2927 # tests are disabled.
2928 test_reentrant_write_buffered = None
2929 test_reentrant_write_text = None
2930
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002931
Guido van Rossum28524c72007-02-27 05:47:44 +00002932def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002933 tests = (CIOTest, PyIOTest,
2934 CBufferedReaderTest, PyBufferedReaderTest,
2935 CBufferedWriterTest, PyBufferedWriterTest,
2936 CBufferedRWPairTest, PyBufferedRWPairTest,
2937 CBufferedRandomTest, PyBufferedRandomTest,
2938 StatefulIncrementalDecoderTest,
2939 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2940 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002941 CMiscIOTest, PyMiscIOTest,
2942 CSignalsTest, PySignalsTest,
2943 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002944
2945 # Put the namespaces of the IO module we are testing and some useful mock
2946 # classes in the __dict__ of each test.
2947 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002948 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002949 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2950 c_io_ns = {name : getattr(io, name) for name in all_members}
2951 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2952 globs = globals()
2953 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2954 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2955 # Avoid turning open into a bound method.
2956 py_io_ns["open"] = pyio.OpenWrapper
2957 for test in tests:
2958 if test.__name__.startswith("C"):
2959 for name, obj in c_io_ns.items():
2960 setattr(test, name, obj)
2961 elif test.__name__.startswith("Py"):
2962 for name, obj in py_io_ns.items():
2963 setattr(test, name, obj)
2964
2965 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002966
2967if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002968 test_main()