blob: 72c9a2d8a730c1df71ddabeb3971bc8ede46cfc0 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +0200614
615 def test_IOBase_finalize(self):
616 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
617 # class which inherits IOBase and an object of this class are caught
618 # in a reference cycle and close() is already in the method cache.
619 class MyIO(self.IOBase):
620 def close(self):
621 pass
622
623 # create an instance to populate the method cache
624 MyIO()
625 obj = MyIO()
626 obj.obj = obj
627 wr = weakref.ref(obj)
628 del MyIO
629 del obj
630 support.gc_collect()
631 self.assertTrue(wr() is None, wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633class PyIOTest(IOTest):
634 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000635
Guido van Rossuma9e20242007-03-08 00:43:48 +0000636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637class CommonBufferedTests:
638 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
639
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000640 def test_detach(self):
641 raw = self.MockRawIO()
642 buf = self.tp(raw)
643 self.assertIs(buf.detach(), raw)
644 self.assertRaises(ValueError, buf.detach)
645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 def test_fileno(self):
647 rawio = self.MockRawIO()
648 bufio = self.tp(rawio)
649
Ezio Melottib3aedd42010-11-20 19:04:17 +0000650 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651
652 def test_no_fileno(self):
653 # XXX will we always have fileno() function? If so, kill
654 # this test. Else, write it.
655 pass
656
657 def test_invalid_args(self):
658 rawio = self.MockRawIO()
659 bufio = self.tp(rawio)
660 # Invalid whence
661 self.assertRaises(ValueError, bufio.seek, 0, -1)
662 self.assertRaises(ValueError, bufio.seek, 0, 3)
663
664 def test_override_destructor(self):
665 tp = self.tp
666 record = []
667 class MyBufferedIO(tp):
668 def __del__(self):
669 record.append(1)
670 try:
671 f = super().__del__
672 except AttributeError:
673 pass
674 else:
675 f()
676 def close(self):
677 record.append(2)
678 super().close()
679 def flush(self):
680 record.append(3)
681 super().flush()
682 rawio = self.MockRawIO()
683 bufio = MyBufferedIO(rawio)
684 writable = bufio.writable()
685 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000686 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000687 if writable:
688 self.assertEqual(record, [1, 2, 3])
689 else:
690 self.assertEqual(record, [1, 2])
691
692 def test_context_manager(self):
693 # Test usability as a context manager
694 rawio = self.MockRawIO()
695 bufio = self.tp(rawio)
696 def _with():
697 with bufio:
698 pass
699 _with()
700 # bufio should now be closed, and using it a second time should raise
701 # a ValueError.
702 self.assertRaises(ValueError, _with)
703
704 def test_error_through_destructor(self):
705 # Test that the exception state is not modified by a destructor,
706 # even if close() fails.
707 rawio = self.CloseFailureIO()
708 def f():
709 self.tp(rawio).xyzzy
710 with support.captured_output("stderr") as s:
711 self.assertRaises(AttributeError, f)
712 s = s.getvalue().strip()
713 if s:
714 # The destructor *may* have printed an unraisable error, check it
715 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000716 self.assertTrue(s.startswith("Exception IOError: "), s)
717 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000718
Antoine Pitrou716c4442009-05-23 19:04:03 +0000719 def test_repr(self):
720 raw = self.MockRawIO()
721 b = self.tp(raw)
722 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
723 self.assertEqual(repr(b), "<%s>" % clsname)
724 raw.name = "dummy"
725 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
726 raw.name = b"dummy"
727 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
728
Antoine Pitrou6be88762010-05-03 16:48:20 +0000729 def test_flush_error_on_close(self):
730 raw = self.MockRawIO()
731 def bad_flush():
732 raise IOError()
733 raw.flush = bad_flush
734 b = self.tp(raw)
735 self.assertRaises(IOError, b.close) # exception not swallowed
736
737 def test_multi_close(self):
738 raw = self.MockRawIO()
739 b = self.tp(raw)
740 b.close()
741 b.close()
742 b.close()
743 self.assertRaises(ValueError, b.flush)
744
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000745 def test_unseekable(self):
746 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
747 self.assertRaises(self.UnsupportedOperation, bufio.tell)
748 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
749
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000750 def test_readonly_attributes(self):
751 raw = self.MockRawIO()
752 buf = self.tp(raw)
753 x = self.MockRawIO()
754 with self.assertRaises(AttributeError):
755 buf.raw = x
756
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
759 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000760
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000761 def test_constructor(self):
762 rawio = self.MockRawIO([b"abc"])
763 bufio = self.tp(rawio)
764 bufio.__init__(rawio)
765 bufio.__init__(rawio, buffer_size=1024)
766 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000767 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000768 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
769 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
770 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
771 rawio = self.MockRawIO([b"abc"])
772 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000773 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000775 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000776 for arg in (None, 7):
777 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
778 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000779 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 # Invalid args
781 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000783 def test_read1(self):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000786 self.assertEqual(b"a", bufio.read(1))
787 self.assertEqual(b"b", bufio.read1(1))
788 self.assertEqual(rawio._reads, 1)
789 self.assertEqual(b"c", bufio.read1(100))
790 self.assertEqual(rawio._reads, 1)
791 self.assertEqual(b"d", bufio.read1(100))
792 self.assertEqual(rawio._reads, 2)
793 self.assertEqual(b"efg", bufio.read1(100))
794 self.assertEqual(rawio._reads, 3)
795 self.assertEqual(b"", bufio.read1(100))
796 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000797 # Invalid args
798 self.assertRaises(ValueError, bufio.read1, -1)
799
800 def test_readinto(self):
801 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
802 bufio = self.tp(rawio)
803 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(bufio.readinto(b), 2)
805 self.assertEqual(b, b"ab")
806 self.assertEqual(bufio.readinto(b), 2)
807 self.assertEqual(b, b"cd")
808 self.assertEqual(bufio.readinto(b), 2)
809 self.assertEqual(b, b"ef")
810 self.assertEqual(bufio.readinto(b), 1)
811 self.assertEqual(b, b"gf")
812 self.assertEqual(bufio.readinto(b), 0)
813 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000814
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000815 def test_readlines(self):
816 def bufio():
817 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
818 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000819 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
820 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
821 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000822
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000824 data = b"abcdefghi"
825 dlen = len(data)
826
827 tests = [
828 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
829 [ 100, [ 3, 3, 3], [ dlen ] ],
830 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
831 ]
832
833 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834 rawio = self.MockFileIO(data)
835 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000836 pos = 0
837 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000838 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000839 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000841 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000842
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000844 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000845 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
846 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000847 self.assertEqual(b"abcd", bufio.read(6))
848 self.assertEqual(b"e", bufio.read(1))
849 self.assertEqual(b"fg", bufio.read())
850 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200851 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000852 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000853
Victor Stinnera80987f2011-05-25 22:47:16 +0200854 rawio = self.MockRawIO((b"a", None, None))
855 self.assertEqual(b"a", rawio.readall())
856 self.assertIsNone(rawio.readall())
857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_read_past_eof(self):
859 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
860 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000861
Ezio Melottib3aedd42010-11-20 19:04:17 +0000862 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000863
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 def test_read_all(self):
865 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
866 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000867
Ezio Melottib3aedd42010-11-20 19:04:17 +0000868 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000869
Victor Stinner45df8202010-04-28 22:31:17 +0000870 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000871 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000873 try:
874 # Write out many bytes with exactly the same number of 0's,
875 # 1's... 255's. This will help us check that concurrent reading
876 # doesn't duplicate or forget contents.
877 N = 1000
878 l = list(range(256)) * N
879 random.shuffle(l)
880 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000881 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000882 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000883 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000884 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000885 errors = []
886 results = []
887 def f():
888 try:
889 # Intra-buffer read then buffer-flushing read
890 for n in cycle([1, 19]):
891 s = bufio.read(n)
892 if not s:
893 break
894 # list.append() is atomic
895 results.append(s)
896 except Exception as e:
897 errors.append(e)
898 raise
899 threads = [threading.Thread(target=f) for x in range(20)]
900 for t in threads:
901 t.start()
902 time.sleep(0.02) # yield
903 for t in threads:
904 t.join()
905 self.assertFalse(errors,
906 "the following exceptions were caught: %r" % errors)
907 s = b''.join(results)
908 for i in range(256):
909 c = bytes(bytearray([i]))
910 self.assertEqual(s.count(c), N)
911 finally:
912 support.unlink(support.TESTFN)
913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000914 def test_misbehaved_io(self):
915 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
916 bufio = self.tp(rawio)
917 self.assertRaises(IOError, bufio.seek, 0)
918 self.assertRaises(IOError, bufio.tell)
919
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000920 def test_no_extraneous_read(self):
921 # Issue #9550; when the raw IO object has satisfied the read request,
922 # we should not issue any additional reads, otherwise it may block
923 # (e.g. socket).
924 bufsize = 16
925 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
926 rawio = self.MockRawIO([b"x" * n])
927 bufio = self.tp(rawio, bufsize)
928 self.assertEqual(bufio.read(n), b"x" * n)
929 # Simple case: one raw read is enough to satisfy the request.
930 self.assertEqual(rawio._extraneous_reads, 0,
931 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
932 # A more complex case where two raw reads are needed to satisfy
933 # the request.
934 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
935 bufio = self.tp(rawio, bufsize)
936 self.assertEqual(bufio.read(n), b"x" * n)
937 self.assertEqual(rawio._extraneous_reads, 0,
938 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
939
940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941class CBufferedReaderTest(BufferedReaderTest):
942 tp = io.BufferedReader
943
944 def test_constructor(self):
945 BufferedReaderTest.test_constructor(self)
946 # The allocation can succeed on 32-bit builds, e.g. with more
947 # than 2GB RAM and a 64-bit kernel.
948 if sys.maxsize > 0x7FFFFFFF:
949 rawio = self.MockRawIO()
950 bufio = self.tp(rawio)
951 self.assertRaises((OverflowError, MemoryError, ValueError),
952 bufio.__init__, rawio, sys.maxsize)
953
954 def test_initialization(self):
955 rawio = self.MockRawIO([b"abc"])
956 bufio = self.tp(rawio)
957 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
958 self.assertRaises(ValueError, bufio.read)
959 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
960 self.assertRaises(ValueError, bufio.read)
961 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
962 self.assertRaises(ValueError, bufio.read)
963
964 def test_misbehaved_io_read(self):
965 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
966 bufio = self.tp(rawio)
967 # _pyio.BufferedReader seems to implement reading different, so that
968 # checking this is not so easy.
969 self.assertRaises(IOError, bufio.read, 10)
970
971 def test_garbage_collection(self):
972 # C BufferedReader objects are collected.
973 # The Python version has __del__, so it ends into gc.garbage instead
974 rawio = self.FileIO(support.TESTFN, "w+b")
975 f = self.tp(rawio)
976 f.f = f
977 wr = weakref.ref(f)
978 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000979 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000980 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981
982class PyBufferedReaderTest(BufferedReaderTest):
983 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000984
Guido van Rossuma9e20242007-03-08 00:43:48 +0000985
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
987 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 def test_constructor(self):
990 rawio = self.MockRawIO()
991 bufio = self.tp(rawio)
992 bufio.__init__(rawio)
993 bufio.__init__(rawio, buffer_size=1024)
994 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000995 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 bufio.flush()
997 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
998 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
999 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1000 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001001 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001003 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001004
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001005 def test_detach_flush(self):
1006 raw = self.MockRawIO()
1007 buf = self.tp(raw)
1008 buf.write(b"howdy!")
1009 self.assertFalse(raw._write_stack)
1010 buf.detach()
1011 self.assertEqual(raw._write_stack, [b"howdy!"])
1012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001014 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 writer = self.MockRawIO()
1016 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001017 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001018 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020 def test_write_overflow(self):
1021 writer = self.MockRawIO()
1022 bufio = self.tp(writer, 8)
1023 contents = b"abcdefghijklmnop"
1024 for n in range(0, len(contents), 3):
1025 bufio.write(contents[n:n+3])
1026 flushed = b"".join(writer._write_stack)
1027 # At least (total - 8) bytes were implicitly flushed, perhaps more
1028 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001029 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def check_writes(self, intermediate_func):
1032 # Lots of writes, test the flushed output is as expected.
1033 contents = bytes(range(256)) * 1000
1034 n = 0
1035 writer = self.MockRawIO()
1036 bufio = self.tp(writer, 13)
1037 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1038 def gen_sizes():
1039 for size in count(1):
1040 for i in range(15):
1041 yield size
1042 sizes = gen_sizes()
1043 while n < len(contents):
1044 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001045 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046 intermediate_func(bufio)
1047 n += size
1048 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001049 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001051 def test_writes(self):
1052 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_writes_and_flushes(self):
1055 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_writes_and_seeks(self):
1058 def _seekabs(bufio):
1059 pos = bufio.tell()
1060 bufio.seek(pos + 1, 0)
1061 bufio.seek(pos - 1, 0)
1062 bufio.seek(pos, 0)
1063 self.check_writes(_seekabs)
1064 def _seekrel(bufio):
1065 pos = bufio.seek(0, 1)
1066 bufio.seek(+1, 1)
1067 bufio.seek(-1, 1)
1068 bufio.seek(pos, 0)
1069 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001070
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071 def test_writes_and_truncates(self):
1072 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 def test_write_non_blocking(self):
1075 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001076 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001077
Ezio Melottib3aedd42010-11-20 19:04:17 +00001078 self.assertEqual(bufio.write(b"abcd"), 4)
1079 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 # 1 byte will be written, the rest will be buffered
1081 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001082 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1085 raw.block_on(b"0")
1086 try:
1087 bufio.write(b"opqrwxyz0123456789")
1088 except self.BlockingIOError as e:
1089 written = e.characters_written
1090 else:
1091 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001092 self.assertEqual(written, 16)
1093 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001095
Ezio Melottib3aedd42010-11-20 19:04:17 +00001096 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 s = raw.pop_written()
1098 # Previously buffered bytes were flushed
1099 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 def test_write_and_rewind(self):
1102 raw = io.BytesIO()
1103 bufio = self.tp(raw, 4)
1104 self.assertEqual(bufio.write(b"abcdef"), 6)
1105 self.assertEqual(bufio.tell(), 6)
1106 bufio.seek(0, 0)
1107 self.assertEqual(bufio.write(b"XY"), 2)
1108 bufio.seek(6, 0)
1109 self.assertEqual(raw.getvalue(), b"XYcdef")
1110 self.assertEqual(bufio.write(b"123456"), 6)
1111 bufio.flush()
1112 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001114 def test_flush(self):
1115 writer = self.MockRawIO()
1116 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001117 bufio.write(b"abc")
1118 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001119 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_destructor(self):
1122 writer = self.MockRawIO()
1123 bufio = self.tp(writer, 8)
1124 bufio.write(b"abc")
1125 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001126 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001127 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001128
1129 def test_truncate(self):
1130 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001131 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001132 bufio = self.tp(raw, 8)
1133 bufio.write(b"abcdef")
1134 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001135 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001136 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001137 self.assertEqual(f.read(), b"abc")
1138
Victor Stinner45df8202010-04-28 22:31:17 +00001139 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001140 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001141 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001142 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001143 # Write out many bytes from many threads and test they were
1144 # all flushed.
1145 N = 1000
1146 contents = bytes(range(256)) * N
1147 sizes = cycle([1, 19])
1148 n = 0
1149 queue = deque()
1150 while n < len(contents):
1151 size = next(sizes)
1152 queue.append(contents[n:n+size])
1153 n += size
1154 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001155 # We use a real file object because it allows us to
1156 # exercise situations where the GIL is released before
1157 # writing the buffer to the raw streams. This is in addition
1158 # to concurrency issues due to switching threads in the middle
1159 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001160 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001161 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001162 errors = []
1163 def f():
1164 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001165 while True:
1166 try:
1167 s = queue.popleft()
1168 except IndexError:
1169 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001170 bufio.write(s)
1171 except Exception as e:
1172 errors.append(e)
1173 raise
1174 threads = [threading.Thread(target=f) for x in range(20)]
1175 for t in threads:
1176 t.start()
1177 time.sleep(0.02) # yield
1178 for t in threads:
1179 t.join()
1180 self.assertFalse(errors,
1181 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001182 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001183 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001184 s = f.read()
1185 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001186 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001187 finally:
1188 support.unlink(support.TESTFN)
1189
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001190 def test_misbehaved_io(self):
1191 rawio = self.MisbehavedRawIO()
1192 bufio = self.tp(rawio, 5)
1193 self.assertRaises(IOError, bufio.seek, 0)
1194 self.assertRaises(IOError, bufio.tell)
1195 self.assertRaises(IOError, bufio.write, b"abcdef")
1196
Benjamin Peterson59406a92009-03-26 17:10:29 +00001197 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001198 with support.check_warnings(("max_buffer_size is deprecated",
1199 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001200 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001201
1202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001203class CBufferedWriterTest(BufferedWriterTest):
1204 tp = io.BufferedWriter
1205
1206 def test_constructor(self):
1207 BufferedWriterTest.test_constructor(self)
1208 # The allocation can succeed on 32-bit builds, e.g. with more
1209 # than 2GB RAM and a 64-bit kernel.
1210 if sys.maxsize > 0x7FFFFFFF:
1211 rawio = self.MockRawIO()
1212 bufio = self.tp(rawio)
1213 self.assertRaises((OverflowError, MemoryError, ValueError),
1214 bufio.__init__, rawio, sys.maxsize)
1215
1216 def test_initialization(self):
1217 rawio = self.MockRawIO()
1218 bufio = self.tp(rawio)
1219 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1220 self.assertRaises(ValueError, bufio.write, b"def")
1221 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1222 self.assertRaises(ValueError, bufio.write, b"def")
1223 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1224 self.assertRaises(ValueError, bufio.write, b"def")
1225
1226 def test_garbage_collection(self):
1227 # C BufferedWriter objects are collected, and collecting them flushes
1228 # all data to disk.
1229 # The Python version has __del__, so it ends into gc.garbage instead
1230 rawio = self.FileIO(support.TESTFN, "w+b")
1231 f = self.tp(rawio)
1232 f.write(b"123xxx")
1233 f.x = f
1234 wr = weakref.ref(f)
1235 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001236 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001237 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001238 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 self.assertEqual(f.read(), b"123xxx")
1240
1241
1242class PyBufferedWriterTest(BufferedWriterTest):
1243 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001244
Guido van Rossum01a27522007-03-07 01:00:12 +00001245class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001246
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001247 def test_constructor(self):
1248 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001249 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001250
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001251 def test_detach(self):
1252 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1253 self.assertRaises(self.UnsupportedOperation, pair.detach)
1254
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001255 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001256 with support.check_warnings(("max_buffer_size is deprecated",
1257 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001258 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001259
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001260 def test_constructor_with_not_readable(self):
1261 class NotReadable(MockRawIO):
1262 def readable(self):
1263 return False
1264
1265 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1266
1267 def test_constructor_with_not_writeable(self):
1268 class NotWriteable(MockRawIO):
1269 def writable(self):
1270 return False
1271
1272 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1273
1274 def test_read(self):
1275 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1276
1277 self.assertEqual(pair.read(3), b"abc")
1278 self.assertEqual(pair.read(1), b"d")
1279 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001280 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1281 self.assertEqual(pair.read(None), b"abc")
1282
1283 def test_readlines(self):
1284 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1285 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1286 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1287 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001288
1289 def test_read1(self):
1290 # .read1() is delegated to the underlying reader object, so this test
1291 # can be shallow.
1292 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1293
1294 self.assertEqual(pair.read1(3), b"abc")
1295
1296 def test_readinto(self):
1297 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1298
1299 data = bytearray(5)
1300 self.assertEqual(pair.readinto(data), 5)
1301 self.assertEqual(data, b"abcde")
1302
1303 def test_write(self):
1304 w = self.MockRawIO()
1305 pair = self.tp(self.MockRawIO(), w)
1306
1307 pair.write(b"abc")
1308 pair.flush()
1309 pair.write(b"def")
1310 pair.flush()
1311 self.assertEqual(w._write_stack, [b"abc", b"def"])
1312
1313 def test_peek(self):
1314 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1315
1316 self.assertTrue(pair.peek(3).startswith(b"abc"))
1317 self.assertEqual(pair.read(3), b"abc")
1318
1319 def test_readable(self):
1320 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1321 self.assertTrue(pair.readable())
1322
1323 def test_writeable(self):
1324 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1325 self.assertTrue(pair.writable())
1326
1327 def test_seekable(self):
1328 # BufferedRWPairs are never seekable, even if their readers and writers
1329 # are.
1330 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1331 self.assertFalse(pair.seekable())
1332
1333 # .flush() is delegated to the underlying writer object and has been
1334 # tested in the test_write method.
1335
1336 def test_close_and_closed(self):
1337 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1338 self.assertFalse(pair.closed)
1339 pair.close()
1340 self.assertTrue(pair.closed)
1341
1342 def test_isatty(self):
1343 class SelectableIsAtty(MockRawIO):
1344 def __init__(self, isatty):
1345 MockRawIO.__init__(self)
1346 self._isatty = isatty
1347
1348 def isatty(self):
1349 return self._isatty
1350
1351 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1352 self.assertFalse(pair.isatty())
1353
1354 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1355 self.assertTrue(pair.isatty())
1356
1357 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1358 self.assertTrue(pair.isatty())
1359
1360 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1361 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001362
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001363class CBufferedRWPairTest(BufferedRWPairTest):
1364 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001365
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001366class PyBufferedRWPairTest(BufferedRWPairTest):
1367 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369
1370class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1371 read_mode = "rb+"
1372 write_mode = "wb+"
1373
1374 def test_constructor(self):
1375 BufferedReaderTest.test_constructor(self)
1376 BufferedWriterTest.test_constructor(self)
1377
1378 def test_read_and_write(self):
1379 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001380 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001381
1382 self.assertEqual(b"as", rw.read(2))
1383 rw.write(b"ddd")
1384 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001385 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001386 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001387 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 def test_seek_and_tell(self):
1390 raw = self.BytesIO(b"asdfghjkl")
1391 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001392
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(b"as", rw.read(2))
1394 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001395 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001396 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001397
Antoine Pitroue05565e2011-08-20 14:39:23 +02001398 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00001399 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02001400 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001402 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001403 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001404 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001405 self.assertEqual(7, rw.tell())
1406 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02001407 rw.flush()
1408 self.assertEqual(b"asdf123fl", raw.getvalue())
1409
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001410 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412 def check_flush_and_read(self, read_func):
1413 raw = self.BytesIO(b"abcdefghi")
1414 bufio = self.tp(raw)
1415
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(b"ef", read_func(bufio, 2))
1419 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(6, bufio.tell())
1422 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 raw.seek(0, 0)
1424 raw.write(b"XYZ")
1425 # flush() resets the read buffer
1426 bufio.flush()
1427 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429
1430 def test_flush_and_read(self):
1431 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1432
1433 def test_flush_and_readinto(self):
1434 def _readinto(bufio, n=-1):
1435 b = bytearray(n if n >= 0 else 9999)
1436 n = bufio.readinto(b)
1437 return bytes(b[:n])
1438 self.check_flush_and_read(_readinto)
1439
1440 def test_flush_and_peek(self):
1441 def _peek(bufio, n=-1):
1442 # This relies on the fact that the buffer can contain the whole
1443 # raw stream, otherwise peek() can return less.
1444 b = bufio.peek(n)
1445 if n != -1:
1446 b = b[:n]
1447 bufio.seek(len(b), 1)
1448 return b
1449 self.check_flush_and_read(_peek)
1450
1451 def test_flush_and_write(self):
1452 raw = self.BytesIO(b"abcdefghi")
1453 bufio = self.tp(raw)
1454
1455 bufio.write(b"123")
1456 bufio.flush()
1457 bufio.write(b"45")
1458 bufio.flush()
1459 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001460 self.assertEqual(b"12345fghi", raw.getvalue())
1461 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462
1463 def test_threads(self):
1464 BufferedReaderTest.test_threads(self)
1465 BufferedWriterTest.test_threads(self)
1466
1467 def test_writes_and_peek(self):
1468 def _peek(bufio):
1469 bufio.peek(1)
1470 self.check_writes(_peek)
1471 def _peek(bufio):
1472 pos = bufio.tell()
1473 bufio.seek(-1, 1)
1474 bufio.peek(1)
1475 bufio.seek(pos, 0)
1476 self.check_writes(_peek)
1477
1478 def test_writes_and_reads(self):
1479 def _read(bufio):
1480 bufio.seek(-1, 1)
1481 bufio.read(1)
1482 self.check_writes(_read)
1483
1484 def test_writes_and_read1s(self):
1485 def _read1(bufio):
1486 bufio.seek(-1, 1)
1487 bufio.read1(1)
1488 self.check_writes(_read1)
1489
1490 def test_writes_and_readintos(self):
1491 def _read(bufio):
1492 bufio.seek(-1, 1)
1493 bufio.readinto(bytearray(1))
1494 self.check_writes(_read)
1495
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001496 def test_write_after_readahead(self):
1497 # Issue #6629: writing after the buffer was filled by readahead should
1498 # first rewind the raw stream.
1499 for overwrite_size in [1, 5]:
1500 raw = self.BytesIO(b"A" * 10)
1501 bufio = self.tp(raw, 4)
1502 # Trigger readahead
1503 self.assertEqual(bufio.read(1), b"A")
1504 self.assertEqual(bufio.tell(), 1)
1505 # Overwriting should rewind the raw stream if it needs so
1506 bufio.write(b"B" * overwrite_size)
1507 self.assertEqual(bufio.tell(), overwrite_size + 1)
1508 # If the write size was smaller than the buffer size, flush() and
1509 # check that rewind happens.
1510 bufio.flush()
1511 self.assertEqual(bufio.tell(), overwrite_size + 1)
1512 s = raw.getvalue()
1513 self.assertEqual(s,
1514 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1515
Antoine Pitrou7c404892011-05-13 00:13:33 +02001516 def test_write_rewind_write(self):
1517 # Various combinations of reading / writing / seeking backwards / writing again
1518 def mutate(bufio, pos1, pos2):
1519 assert pos2 >= pos1
1520 # Fill the buffer
1521 bufio.seek(pos1)
1522 bufio.read(pos2 - pos1)
1523 bufio.write(b'\x02')
1524 # This writes earlier than the previous write, but still inside
1525 # the buffer.
1526 bufio.seek(pos1)
1527 bufio.write(b'\x01')
1528
1529 b = b"\x80\x81\x82\x83\x84"
1530 for i in range(0, len(b)):
1531 for j in range(i, len(b)):
1532 raw = self.BytesIO(b)
1533 bufio = self.tp(raw, 100)
1534 mutate(bufio, i, j)
1535 bufio.flush()
1536 expected = bytearray(b)
1537 expected[j] = 2
1538 expected[i] = 1
1539 self.assertEqual(raw.getvalue(), expected,
1540 "failed result for i=%d, j=%d" % (i, j))
1541
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001542 def test_truncate_after_read_or_write(self):
1543 raw = self.BytesIO(b"A" * 10)
1544 bufio = self.tp(raw, 100)
1545 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1546 self.assertEqual(bufio.truncate(), 2)
1547 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1548 self.assertEqual(bufio.truncate(), 4)
1549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 def test_misbehaved_io(self):
1551 BufferedReaderTest.test_misbehaved_io(self)
1552 BufferedWriterTest.test_misbehaved_io(self)
1553
Antoine Pitroue05565e2011-08-20 14:39:23 +02001554 def test_interleaved_read_write(self):
1555 # Test for issue #12213
1556 with self.BytesIO(b'abcdefgh') as raw:
1557 with self.tp(raw, 100) as f:
1558 f.write(b"1")
1559 self.assertEqual(f.read(1), b'b')
1560 f.write(b'2')
1561 self.assertEqual(f.read1(1), b'd')
1562 f.write(b'3')
1563 buf = bytearray(1)
1564 f.readinto(buf)
1565 self.assertEqual(buf, b'f')
1566 f.write(b'4')
1567 self.assertEqual(f.peek(1), b'h')
1568 f.flush()
1569 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1570
1571 with self.BytesIO(b'abc') as raw:
1572 with self.tp(raw, 100) as f:
1573 self.assertEqual(f.read(1), b'a')
1574 f.write(b"2")
1575 self.assertEqual(f.read(1), b'c')
1576 f.flush()
1577 self.assertEqual(raw.getvalue(), b'a2c')
1578
1579 def test_interleaved_readline_write(self):
1580 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1581 with self.tp(raw) as f:
1582 f.write(b'1')
1583 self.assertEqual(f.readline(), b'b\n')
1584 f.write(b'2')
1585 self.assertEqual(f.readline(), b'def\n')
1586 f.write(b'3')
1587 self.assertEqual(f.readline(), b'\n')
1588 f.flush()
1589 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1590
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001591 # You can't construct a BufferedRandom over a non-seekable stream.
1592 test_unseekable = None
1593
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001594class CBufferedRandomTest(BufferedRandomTest):
1595 tp = io.BufferedRandom
1596
1597 def test_constructor(self):
1598 BufferedRandomTest.test_constructor(self)
1599 # The allocation can succeed on 32-bit builds, e.g. with more
1600 # than 2GB RAM and a 64-bit kernel.
1601 if sys.maxsize > 0x7FFFFFFF:
1602 rawio = self.MockRawIO()
1603 bufio = self.tp(rawio)
1604 self.assertRaises((OverflowError, MemoryError, ValueError),
1605 bufio.__init__, rawio, sys.maxsize)
1606
1607 def test_garbage_collection(self):
1608 CBufferedReaderTest.test_garbage_collection(self)
1609 CBufferedWriterTest.test_garbage_collection(self)
1610
1611class PyBufferedRandomTest(BufferedRandomTest):
1612 tp = pyio.BufferedRandom
1613
1614
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001615# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1616# properties:
1617# - A single output character can correspond to many bytes of input.
1618# - The number of input bytes to complete the character can be
1619# undetermined until the last input byte is received.
1620# - The number of input bytes can vary depending on previous input.
1621# - A single input byte can correspond to many characters of output.
1622# - The number of output characters can be undetermined until the
1623# last input byte is received.
1624# - The number of output characters can vary depending on previous input.
1625
1626class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1627 """
1628 For testing seek/tell behavior with a stateful, buffering decoder.
1629
1630 Input is a sequence of words. Words may be fixed-length (length set
1631 by input) or variable-length (period-terminated). In variable-length
1632 mode, extra periods are ignored. Possible words are:
1633 - 'i' followed by a number sets the input length, I (maximum 99).
1634 When I is set to 0, words are space-terminated.
1635 - 'o' followed by a number sets the output length, O (maximum 99).
1636 - Any other word is converted into a word followed by a period on
1637 the output. The output word consists of the input word truncated
1638 or padded out with hyphens to make its length equal to O. If O
1639 is 0, the word is output verbatim without truncating or padding.
1640 I and O are initially set to 1. When I changes, any buffered input is
1641 re-scanned according to the new I. EOF also terminates the last word.
1642 """
1643
1644 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001645 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001646 self.reset()
1647
1648 def __repr__(self):
1649 return '<SID %x>' % id(self)
1650
1651 def reset(self):
1652 self.i = 1
1653 self.o = 1
1654 self.buffer = bytearray()
1655
1656 def getstate(self):
1657 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1658 return bytes(self.buffer), i*100 + o
1659
1660 def setstate(self, state):
1661 buffer, io = state
1662 self.buffer = bytearray(buffer)
1663 i, o = divmod(io, 100)
1664 self.i, self.o = i ^ 1, o ^ 1
1665
1666 def decode(self, input, final=False):
1667 output = ''
1668 for b in input:
1669 if self.i == 0: # variable-length, terminated with period
1670 if b == ord('.'):
1671 if self.buffer:
1672 output += self.process_word()
1673 else:
1674 self.buffer.append(b)
1675 else: # fixed-length, terminate after self.i bytes
1676 self.buffer.append(b)
1677 if len(self.buffer) == self.i:
1678 output += self.process_word()
1679 if final and self.buffer: # EOF terminates the last word
1680 output += self.process_word()
1681 return output
1682
1683 def process_word(self):
1684 output = ''
1685 if self.buffer[0] == ord('i'):
1686 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1687 elif self.buffer[0] == ord('o'):
1688 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1689 else:
1690 output = self.buffer.decode('ascii')
1691 if len(output) < self.o:
1692 output += '-'*self.o # pad out with hyphens
1693 if self.o:
1694 output = output[:self.o] # truncate to output length
1695 output += '.'
1696 self.buffer = bytearray()
1697 return output
1698
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001699 codecEnabled = False
1700
1701 @classmethod
1702 def lookupTestDecoder(cls, name):
1703 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001704 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001705 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001706 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001707 incrementalencoder=None,
1708 streamreader=None, streamwriter=None,
1709 incrementaldecoder=cls)
1710
1711# Register the previous decoder for testing.
1712# Disabled by default, tests will enable it.
1713codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1714
1715
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001716class StatefulIncrementalDecoderTest(unittest.TestCase):
1717 """
1718 Make sure the StatefulIncrementalDecoder actually works.
1719 """
1720
1721 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001722 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001723 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001724 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001725 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001726 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001727 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001728 # I=0, O=6 (variable-length input, fixed-length output)
1729 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1730 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001731 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001732 # I=6, O=3 (fixed-length input > fixed-length output)
1733 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1734 # I=0, then 3; O=29, then 15 (with longer output)
1735 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1736 'a----------------------------.' +
1737 'b----------------------------.' +
1738 'cde--------------------------.' +
1739 'abcdefghijabcde.' +
1740 'a.b------------.' +
1741 '.c.------------.' +
1742 'd.e------------.' +
1743 'k--------------.' +
1744 'l--------------.' +
1745 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001746 ]
1747
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001749 # Try a few one-shot test cases.
1750 for input, eof, output in self.test_cases:
1751 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001752 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001753
1754 # Also test an unfinished decode, followed by forcing EOF.
1755 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001756 self.assertEqual(d.decode(b'oiabcd'), '')
1757 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001758
1759class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001760
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001761 def setUp(self):
1762 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1763 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001764 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001765
Guido van Rossumd0712812007-04-11 16:32:43 +00001766 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001767 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001769 def test_constructor(self):
1770 r = self.BytesIO(b"\xc3\xa9\n\n")
1771 b = self.BufferedReader(r, 1000)
1772 t = self.TextIOWrapper(b)
1773 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001774 self.assertEqual(t.encoding, "latin1")
1775 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001776 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001777 self.assertEqual(t.encoding, "utf8")
1778 self.assertEqual(t.line_buffering, True)
1779 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 self.assertRaises(TypeError, t.__init__, b, newline=42)
1781 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1782
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001783 def test_detach(self):
1784 r = self.BytesIO()
1785 b = self.BufferedWriter(r)
1786 t = self.TextIOWrapper(b)
1787 self.assertIs(t.detach(), b)
1788
1789 t = self.TextIOWrapper(b, encoding="ascii")
1790 t.write("howdy")
1791 self.assertFalse(r.getvalue())
1792 t.detach()
1793 self.assertEqual(r.getvalue(), b"howdy")
1794 self.assertRaises(ValueError, t.detach)
1795
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001796 def test_repr(self):
1797 raw = self.BytesIO("hello".encode("utf-8"))
1798 b = self.BufferedReader(raw)
1799 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001800 modname = self.TextIOWrapper.__module__
1801 self.assertEqual(repr(t),
1802 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1803 raw.name = "dummy"
1804 self.assertEqual(repr(t),
1805 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001806 t.mode = "r"
1807 self.assertEqual(repr(t),
1808 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001809 raw.name = b"dummy"
1810 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001811 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001812
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001813 def test_line_buffering(self):
1814 r = self.BytesIO()
1815 b = self.BufferedWriter(r, 1000)
1816 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001817 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001818 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001819 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001820 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001821 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001822 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 def test_encoding(self):
1825 # Check the encoding attribute is always set, and valid
1826 b = self.BytesIO()
1827 t = self.TextIOWrapper(b, encoding="utf8")
1828 self.assertEqual(t.encoding, "utf8")
1829 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001830 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 codecs.lookup(t.encoding)
1832
1833 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001834 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 b = self.BytesIO(b"abc\n\xff\n")
1836 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001837 self.assertRaises(UnicodeError, t.read)
1838 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 b = self.BytesIO(b"abc\n\xff\n")
1840 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001841 self.assertRaises(UnicodeError, t.read)
1842 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843 b = self.BytesIO(b"abc\n\xff\n")
1844 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001845 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001846 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001847 b = self.BytesIO(b"abc\n\xff\n")
1848 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001849 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001852 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001853 b = self.BytesIO()
1854 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001855 self.assertRaises(UnicodeError, t.write, "\xff")
1856 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001857 b = self.BytesIO()
1858 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001859 self.assertRaises(UnicodeError, t.write, "\xff")
1860 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001861 b = self.BytesIO()
1862 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001863 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001864 t.write("abc\xffdef\n")
1865 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001866 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001867 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 b = self.BytesIO()
1869 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001870 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001871 t.write("abc\xffdef\n")
1872 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001873 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001874
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001876 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1877
1878 tests = [
1879 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001880 [ '', input_lines ],
1881 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1882 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1883 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001884 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001885 encodings = (
1886 'utf-8', 'latin-1',
1887 'utf-16', 'utf-16-le', 'utf-16-be',
1888 'utf-32', 'utf-32-le', 'utf-32-be',
1889 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001890
Guido van Rossum8358db22007-08-18 21:39:55 +00001891 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001892 # character in TextIOWrapper._pending_line.
1893 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001894 # XXX: str.encode() should return bytes
1895 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001896 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001897 for bufsize in range(1, 10):
1898 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001899 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1900 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001901 encoding=encoding)
1902 if do_reads:
1903 got_lines = []
1904 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001905 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001906 if c2 == '':
1907 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001908 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001909 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001910 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001911 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001912
1913 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001914 self.assertEqual(got_line, exp_line)
1915 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001916
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 def test_newlines_input(self):
1918 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001919 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1920 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001921 (None, normalized.decode("ascii").splitlines(True)),
1922 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001923 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1924 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1925 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001926 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001927 buf = self.BytesIO(testdata)
1928 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001929 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001930 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001931 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001932
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 def test_newlines_output(self):
1934 testdict = {
1935 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1936 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1937 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1938 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1939 }
1940 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1941 for newline, expected in tests:
1942 buf = self.BytesIO()
1943 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1944 txt.write("AAA\nB")
1945 txt.write("BB\nCCC\n")
1946 txt.write("X\rY\r\nZ")
1947 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001948 self.assertEqual(buf.closed, False)
1949 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001950
1951 def test_destructor(self):
1952 l = []
1953 base = self.BytesIO
1954 class MyBytesIO(base):
1955 def close(self):
1956 l.append(self.getvalue())
1957 base.close(self)
1958 b = MyBytesIO()
1959 t = self.TextIOWrapper(b, encoding="ascii")
1960 t.write("abc")
1961 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001962 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964
1965 def test_override_destructor(self):
1966 record = []
1967 class MyTextIO(self.TextIOWrapper):
1968 def __del__(self):
1969 record.append(1)
1970 try:
1971 f = super().__del__
1972 except AttributeError:
1973 pass
1974 else:
1975 f()
1976 def close(self):
1977 record.append(2)
1978 super().close()
1979 def flush(self):
1980 record.append(3)
1981 super().flush()
1982 b = self.BytesIO()
1983 t = MyTextIO(b, encoding="ascii")
1984 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001985 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001986 self.assertEqual(record, [1, 2, 3])
1987
1988 def test_error_through_destructor(self):
1989 # Test that the exception state is not modified by a destructor,
1990 # even if close() fails.
1991 rawio = self.CloseFailureIO()
1992 def f():
1993 self.TextIOWrapper(rawio).xyzzy
1994 with support.captured_output("stderr") as s:
1995 self.assertRaises(AttributeError, f)
1996 s = s.getvalue().strip()
1997 if s:
1998 # The destructor *may* have printed an unraisable error, check it
1999 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002000 self.assertTrue(s.startswith("Exception IOError: "), s)
2001 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00002002
Guido van Rossum9b76da62007-04-11 01:09:03 +00002003 # Systematic tests of the text I/O API
2004
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002006 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2007 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002009 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002010 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002011 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002013 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002014 self.assertEqual(f.tell(), 0)
2015 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002016 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002017 self.assertEqual(f.seek(0), 0)
2018 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002019 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002020 self.assertEqual(f.read(2), "ab")
2021 self.assertEqual(f.read(1), "c")
2022 self.assertEqual(f.read(1), "")
2023 self.assertEqual(f.read(), "")
2024 self.assertEqual(f.tell(), cookie)
2025 self.assertEqual(f.seek(0), 0)
2026 self.assertEqual(f.seek(0, 2), cookie)
2027 self.assertEqual(f.write("def"), 3)
2028 self.assertEqual(f.seek(cookie), cookie)
2029 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002030 if enc.startswith("utf"):
2031 self.multi_line_test(f, enc)
2032 f.close()
2033
2034 def multi_line_test(self, f, enc):
2035 f.seek(0)
2036 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002037 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002038 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002039 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002040 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002041 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002042 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002043 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002044 wlines.append((f.tell(), line))
2045 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002046 f.seek(0)
2047 rlines = []
2048 while True:
2049 pos = f.tell()
2050 line = f.readline()
2051 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002052 break
2053 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002054 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002056 def test_telling(self):
2057 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002058 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002059 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002060 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002061 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002062 p2 = f.tell()
2063 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002064 self.assertEqual(f.tell(), p0)
2065 self.assertEqual(f.readline(), "\xff\n")
2066 self.assertEqual(f.tell(), p1)
2067 self.assertEqual(f.readline(), "\xff\n")
2068 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002069 f.seek(0)
2070 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002071 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002072 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002073 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002074 f.close()
2075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002076 def test_seeking(self):
2077 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002078 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002079 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002080 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002081 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002082 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002083 suffix = bytes(u_suffix.encode("utf-8"))
2084 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002085 with self.open(support.TESTFN, "wb") as f:
2086 f.write(line*2)
2087 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2088 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002089 self.assertEqual(s, str(prefix, "ascii"))
2090 self.assertEqual(f.tell(), prefix_size)
2091 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002092
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002093 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002094 # Regression test for a specific bug
2095 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002096 with self.open(support.TESTFN, "wb") as f:
2097 f.write(data)
2098 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2099 f._CHUNK_SIZE # Just test that it exists
2100 f._CHUNK_SIZE = 2
2101 f.readline()
2102 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002103
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002104 def test_seek_and_tell(self):
2105 #Test seek/tell using the StatefulIncrementalDecoder.
2106 # Make test faster by doing smaller seeks
2107 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002108
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002109 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002110 """Tell/seek to various points within a data stream and ensure
2111 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002112 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002113 f.write(data)
2114 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002115 f = self.open(support.TESTFN, encoding='test_decoder')
2116 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002117 decoded = f.read()
2118 f.close()
2119
Neal Norwitze2b07052008-03-18 19:52:05 +00002120 for i in range(min_pos, len(decoded) + 1): # seek positions
2121 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002122 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002123 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002124 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002125 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002126 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002127 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002128 f.close()
2129
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002130 # Enable the test decoder.
2131 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002132
2133 # Run the tests.
2134 try:
2135 # Try each test case.
2136 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002137 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002138
2139 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002140 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2141 offset = CHUNK_SIZE - len(input)//2
2142 prefix = b'.'*offset
2143 # Don't bother seeking into the prefix (takes too long).
2144 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002145 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002146
2147 # Ensure our test decoder won't interfere with subsequent tests.
2148 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002149 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002150
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002152 data = "1234567890"
2153 tests = ("utf-16",
2154 "utf-16-le",
2155 "utf-16-be",
2156 "utf-32",
2157 "utf-32-le",
2158 "utf-32-be")
2159 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160 buf = self.BytesIO()
2161 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002162 # Check if the BOM is written only once (see issue1753).
2163 f.write(data)
2164 f.write(data)
2165 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002167 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002168 self.assertEqual(f.read(), data * 2)
2169 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002170
Benjamin Petersona1b49012009-03-31 23:11:32 +00002171 def test_unreadable(self):
2172 class UnReadable(self.BytesIO):
2173 def readable(self):
2174 return False
2175 txt = self.TextIOWrapper(UnReadable())
2176 self.assertRaises(IOError, txt.read)
2177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 def test_read_one_by_one(self):
2179 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002180 reads = ""
2181 while True:
2182 c = txt.read(1)
2183 if not c:
2184 break
2185 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002187
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002188 def test_readlines(self):
2189 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2190 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2191 txt.seek(0)
2192 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2193 txt.seek(0)
2194 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2195
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002196 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002198 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002200 reads = ""
2201 while True:
2202 c = txt.read(128)
2203 if not c:
2204 break
2205 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002207
2208 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002209 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002210
2211 # read one char at a time
2212 reads = ""
2213 while True:
2214 c = txt.read(1)
2215 if not c:
2216 break
2217 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002219
2220 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002221 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002222 txt._CHUNK_SIZE = 4
2223
2224 reads = ""
2225 while True:
2226 c = txt.read(4)
2227 if not c:
2228 break
2229 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002231
2232 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002233 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002234 txt._CHUNK_SIZE = 4
2235
2236 reads = txt.read(4)
2237 reads += txt.read(4)
2238 reads += txt.readline()
2239 reads += txt.readline()
2240 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002242
2243 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002245 txt._CHUNK_SIZE = 4
2246
2247 reads = txt.read(4)
2248 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002250
2251 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002252 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002253 txt._CHUNK_SIZE = 4
2254
2255 reads = txt.read(4)
2256 pos = txt.tell()
2257 txt.seek(0)
2258 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002259 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002260
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002261 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 buffer = self.BytesIO(self.testdata)
2263 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002264
2265 self.assertEqual(buffer.seekable(), txt.seekable())
2266
Antoine Pitroue4501852009-05-14 18:55:55 +00002267 def test_append_bom(self):
2268 # The BOM is not written again when appending to a non-empty file
2269 filename = support.TESTFN
2270 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2271 with self.open(filename, 'w', encoding=charset) as f:
2272 f.write('aaa')
2273 pos = f.tell()
2274 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002276
2277 with self.open(filename, 'a', encoding=charset) as f:
2278 f.write('xxx')
2279 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002280 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002281
2282 def test_seek_bom(self):
2283 # Same test, but when seeking manually
2284 filename = support.TESTFN
2285 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2286 with self.open(filename, 'w', encoding=charset) as f:
2287 f.write('aaa')
2288 pos = f.tell()
2289 with self.open(filename, 'r+', encoding=charset) as f:
2290 f.seek(pos)
2291 f.write('zzz')
2292 f.seek(0)
2293 f.write('bbb')
2294 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002295 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002296
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002297 def test_errors_property(self):
2298 with self.open(support.TESTFN, "w") as f:
2299 self.assertEqual(f.errors, "strict")
2300 with self.open(support.TESTFN, "w", errors="replace") as f:
2301 self.assertEqual(f.errors, "replace")
2302
Victor Stinner45df8202010-04-28 22:31:17 +00002303 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002304 def test_threads_write(self):
2305 # Issue6750: concurrent writes could duplicate data
2306 event = threading.Event()
2307 with self.open(support.TESTFN, "w", buffering=1) as f:
2308 def run(n):
2309 text = "Thread%03d\n" % n
2310 event.wait()
2311 f.write(text)
2312 threads = [threading.Thread(target=lambda n=x: run(n))
2313 for x in range(20)]
2314 for t in threads:
2315 t.start()
2316 time.sleep(0.02)
2317 event.set()
2318 for t in threads:
2319 t.join()
2320 with self.open(support.TESTFN) as f:
2321 content = f.read()
2322 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002323 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002324
Antoine Pitrou6be88762010-05-03 16:48:20 +00002325 def test_flush_error_on_close(self):
2326 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2327 def bad_flush():
2328 raise IOError()
2329 txt.flush = bad_flush
2330 self.assertRaises(IOError, txt.close) # exception not swallowed
2331
2332 def test_multi_close(self):
2333 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2334 txt.close()
2335 txt.close()
2336 txt.close()
2337 self.assertRaises(ValueError, txt.flush)
2338
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002339 def test_unseekable(self):
2340 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2341 self.assertRaises(self.UnsupportedOperation, txt.tell)
2342 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2343
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002344 def test_readonly_attributes(self):
2345 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2346 buf = self.BytesIO(self.testdata)
2347 with self.assertRaises(AttributeError):
2348 txt.buffer = buf
2349
Antoine Pitroue96ec682011-07-23 21:46:35 +02002350 def test_rawio(self):
2351 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
2352 # that subprocess.Popen() can have the required unbuffered
2353 # semantics with universal_newlines=True.
2354 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2355 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
2356 # Reads
2357 self.assertEqual(txt.read(4), 'abcd')
2358 self.assertEqual(txt.readline(), 'efghi\n')
2359 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
2360
2361 def test_rawio_write_through(self):
2362 # Issue #12591: with write_through=True, writes don't need a flush
2363 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
2364 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
2365 write_through=True)
2366 txt.write('1')
2367 txt.write('23\n4')
2368 txt.write('5')
2369 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
2370
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002371class CTextIOWrapperTest(TextIOWrapperTest):
2372
2373 def test_initialization(self):
2374 r = self.BytesIO(b"\xc3\xa9\n\n")
2375 b = self.BufferedReader(r, 1000)
2376 t = self.TextIOWrapper(b)
2377 self.assertRaises(TypeError, t.__init__, b, newline=42)
2378 self.assertRaises(ValueError, t.read)
2379 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2380 self.assertRaises(ValueError, t.read)
2381
2382 def test_garbage_collection(self):
2383 # C TextIOWrapper objects are collected, and collecting them flushes
2384 # all data to disk.
2385 # The Python version has __del__, so it ends in gc.garbage instead.
2386 rawio = io.FileIO(support.TESTFN, "wb")
2387 b = self.BufferedWriter(rawio)
2388 t = self.TextIOWrapper(b, encoding="ascii")
2389 t.write("456def")
2390 t.x = t
2391 wr = weakref.ref(t)
2392 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002393 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002394 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002395 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 self.assertEqual(f.read(), b"456def")
2397
2398class PyTextIOWrapperTest(TextIOWrapperTest):
2399 pass
2400
2401
2402class IncrementalNewlineDecoderTest(unittest.TestCase):
2403
2404 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002405 # UTF-8 specific tests for a newline decoder
2406 def _check_decode(b, s, **kwargs):
2407 # We exercise getstate() / setstate() as well as decode()
2408 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002409 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002410 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002411 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002412
Antoine Pitrou180a3362008-12-14 16:36:46 +00002413 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002414
Antoine Pitrou180a3362008-12-14 16:36:46 +00002415 _check_decode(b'\xe8', "")
2416 _check_decode(b'\xa2', "")
2417 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002418
Antoine Pitrou180a3362008-12-14 16:36:46 +00002419 _check_decode(b'\xe8', "")
2420 _check_decode(b'\xa2', "")
2421 _check_decode(b'\x88', "\u8888")
2422
2423 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002424 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2425
Antoine Pitrou180a3362008-12-14 16:36:46 +00002426 decoder.reset()
2427 _check_decode(b'\n', "\n")
2428 _check_decode(b'\r', "")
2429 _check_decode(b'', "\n", final=True)
2430 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002431
Antoine Pitrou180a3362008-12-14 16:36:46 +00002432 _check_decode(b'\r', "")
2433 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002434
Antoine Pitrou180a3362008-12-14 16:36:46 +00002435 _check_decode(b'\r\r\n', "\n\n")
2436 _check_decode(b'\r', "")
2437 _check_decode(b'\r', "\n")
2438 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002439
Antoine Pitrou180a3362008-12-14 16:36:46 +00002440 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2441 _check_decode(b'\xe8\xa2\x88', "\u8888")
2442 _check_decode(b'\n', "\n")
2443 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2444 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002445
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002446 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002447 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002448 if encoding is not None:
2449 encoder = codecs.getincrementalencoder(encoding)()
2450 def _decode_bytewise(s):
2451 # Decode one byte at a time
2452 for b in encoder.encode(s):
2453 result.append(decoder.decode(bytes([b])))
2454 else:
2455 encoder = None
2456 def _decode_bytewise(s):
2457 # Decode one char at a time
2458 for c in s:
2459 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002460 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002461 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002462 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002463 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002464 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002465 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002466 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002467 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002468 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002469 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002470 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002471 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002472 input = "abc"
2473 if encoder is not None:
2474 encoder.reset()
2475 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002476 self.assertEqual(decoder.decode(input), "abc")
2477 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002478
2479 def test_newline_decoder(self):
2480 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002481 # None meaning the IncrementalNewlineDecoder takes unicode input
2482 # rather than bytes input
2483 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002484 'utf-16', 'utf-16-le', 'utf-16-be',
2485 'utf-32', 'utf-32-le', 'utf-32-be',
2486 )
2487 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002488 decoder = enc and codecs.getincrementaldecoder(enc)()
2489 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2490 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002491 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002492 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2493 self.check_newline_decoding_utf8(decoder)
2494
Antoine Pitrou66913e22009-03-06 23:40:56 +00002495 def test_newline_bytes(self):
2496 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2497 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002498 self.assertEqual(dec.newlines, None)
2499 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2500 self.assertEqual(dec.newlines, None)
2501 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2502 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002503 dec = self.IncrementalNewlineDecoder(None, translate=False)
2504 _check(dec)
2505 dec = self.IncrementalNewlineDecoder(None, translate=True)
2506 _check(dec)
2507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002508class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2509 pass
2510
2511class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2512 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002513
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002514
Guido van Rossum01a27522007-03-07 01:00:12 +00002515# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002516
Guido van Rossum5abbf752007-08-27 17:39:33 +00002517class MiscIOTest(unittest.TestCase):
2518
Barry Warsaw40e82462008-11-20 20:14:50 +00002519 def tearDown(self):
2520 support.unlink(support.TESTFN)
2521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 def test___all__(self):
2523 for name in self.io.__all__:
2524 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002525 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002526 if name == "open":
2527 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002528 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002529 self.assertTrue(issubclass(obj, Exception), name)
2530 elif not name.startswith("SEEK_"):
2531 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002532
Barry Warsaw40e82462008-11-20 20:14:50 +00002533 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002534 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002535 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002536 f.close()
2537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(f.name, support.TESTFN)
2540 self.assertEqual(f.buffer.name, support.TESTFN)
2541 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2542 self.assertEqual(f.mode, "U")
2543 self.assertEqual(f.buffer.mode, "rb")
2544 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002545 f.close()
2546
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002547 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002548 self.assertEqual(f.mode, "w+")
2549 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2550 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(g.mode, "wb")
2554 self.assertEqual(g.raw.mode, "wb")
2555 self.assertEqual(g.name, f.fileno())
2556 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002557 f.close()
2558 g.close()
2559
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002560 def test_io_after_close(self):
2561 for kwargs in [
2562 {"mode": "w"},
2563 {"mode": "wb"},
2564 {"mode": "w", "buffering": 1},
2565 {"mode": "w", "buffering": 2},
2566 {"mode": "wb", "buffering": 0},
2567 {"mode": "r"},
2568 {"mode": "rb"},
2569 {"mode": "r", "buffering": 1},
2570 {"mode": "r", "buffering": 2},
2571 {"mode": "rb", "buffering": 0},
2572 {"mode": "w+"},
2573 {"mode": "w+b"},
2574 {"mode": "w+", "buffering": 1},
2575 {"mode": "w+", "buffering": 2},
2576 {"mode": "w+b", "buffering": 0},
2577 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002579 f.close()
2580 self.assertRaises(ValueError, f.flush)
2581 self.assertRaises(ValueError, f.fileno)
2582 self.assertRaises(ValueError, f.isatty)
2583 self.assertRaises(ValueError, f.__iter__)
2584 if hasattr(f, "peek"):
2585 self.assertRaises(ValueError, f.peek, 1)
2586 self.assertRaises(ValueError, f.read)
2587 if hasattr(f, "read1"):
2588 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002589 if hasattr(f, "readall"):
2590 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002591 if hasattr(f, "readinto"):
2592 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2593 self.assertRaises(ValueError, f.readline)
2594 self.assertRaises(ValueError, f.readlines)
2595 self.assertRaises(ValueError, f.seek, 0)
2596 self.assertRaises(ValueError, f.tell)
2597 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 self.assertRaises(ValueError, f.write,
2599 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002600 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002603 def test_blockingioerror(self):
2604 # Various BlockingIOError issues
2605 self.assertRaises(TypeError, self.BlockingIOError)
2606 self.assertRaises(TypeError, self.BlockingIOError, 1)
2607 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2608 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2609 b = self.BlockingIOError(1, "")
2610 self.assertEqual(b.characters_written, 0)
2611 class C(str):
2612 pass
2613 c = C("")
2614 b = self.BlockingIOError(1, c)
2615 c.b = b
2616 b.c = c
2617 wr = weakref.ref(c)
2618 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002619 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002620 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002621
2622 def test_abcs(self):
2623 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002624 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2625 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2626 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2627 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002628
2629 def _check_abc_inheritance(self, abcmodule):
2630 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002631 self.assertIsInstance(f, abcmodule.IOBase)
2632 self.assertIsInstance(f, abcmodule.RawIOBase)
2633 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2634 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002635 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002636 self.assertIsInstance(f, abcmodule.IOBase)
2637 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2638 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2639 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002640 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002641 self.assertIsInstance(f, abcmodule.IOBase)
2642 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2643 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2644 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002645
2646 def test_abc_inheritance(self):
2647 # Test implementations inherit from their respective ABCs
2648 self._check_abc_inheritance(self)
2649
2650 def test_abc_inheritance_official(self):
2651 # Test implementations inherit from the official ABCs of the
2652 # baseline "io" module.
2653 self._check_abc_inheritance(io)
2654
Antoine Pitroue033e062010-10-29 10:38:18 +00002655 def _check_warn_on_dealloc(self, *args, **kwargs):
2656 f = open(*args, **kwargs)
2657 r = repr(f)
2658 with self.assertWarns(ResourceWarning) as cm:
2659 f = None
2660 support.gc_collect()
2661 self.assertIn(r, str(cm.warning.args[0]))
2662
2663 def test_warn_on_dealloc(self):
2664 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2665 self._check_warn_on_dealloc(support.TESTFN, "wb")
2666 self._check_warn_on_dealloc(support.TESTFN, "w")
2667
2668 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2669 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002670 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002671 for fd in fds:
2672 try:
2673 os.close(fd)
2674 except EnvironmentError as e:
2675 if e.errno != errno.EBADF:
2676 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002677 self.addCleanup(cleanup_fds)
2678 r, w = os.pipe()
2679 fds += r, w
2680 self._check_warn_on_dealloc(r, *args, **kwargs)
2681 # When using closefd=False, there's no warning
2682 r, w = os.pipe()
2683 fds += r, w
2684 with warnings.catch_warnings(record=True) as recorded:
2685 open(r, *args, closefd=False, **kwargs)
2686 support.gc_collect()
2687 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002688
2689 def test_warn_on_dealloc_fd(self):
2690 self._check_warn_on_dealloc_fd("rb", buffering=0)
2691 self._check_warn_on_dealloc_fd("rb")
2692 self._check_warn_on_dealloc_fd("r")
2693
2694
Antoine Pitrou243757e2010-11-05 21:15:39 +00002695 def test_pickling(self):
2696 # Pickling file objects is forbidden
2697 for kwargs in [
2698 {"mode": "w"},
2699 {"mode": "wb"},
2700 {"mode": "wb", "buffering": 0},
2701 {"mode": "r"},
2702 {"mode": "rb"},
2703 {"mode": "rb", "buffering": 0},
2704 {"mode": "w+"},
2705 {"mode": "w+b"},
2706 {"mode": "w+b", "buffering": 0},
2707 ]:
2708 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2709 with self.open(support.TESTFN, **kwargs) as f:
2710 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002712class CMiscIOTest(MiscIOTest):
2713 io = io
2714
2715class PyMiscIOTest(MiscIOTest):
2716 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002717
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002718
2719@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2720class SignalsTest(unittest.TestCase):
2721
2722 def setUp(self):
2723 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2724
2725 def tearDown(self):
2726 signal.signal(signal.SIGALRM, self.oldalrm)
2727
2728 def alarm_interrupt(self, sig, frame):
2729 1/0
2730
2731 @unittest.skipUnless(threading, 'Threading required for this test.')
Victor Stinnercd1aa0d2011-07-04 11:48:17 +02002732 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2733 'issue #12429: skip test on FreeBSD <= 7')
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002734 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2735 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002736 invokes the signal handler, and bubbles up the exception raised
2737 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002738 read_results = []
2739 def _read():
2740 s = os.read(r, 1)
2741 read_results.append(s)
2742 t = threading.Thread(target=_read)
2743 t.daemon = True
2744 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002745 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002746 try:
2747 wio = self.io.open(w, **fdopen_kwargs)
2748 t.start()
2749 signal.alarm(1)
2750 # Fill the pipe enough that the write will be blocking.
2751 # It will be interrupted by the timer armed above. Since the
2752 # other thread has read one byte, the low-level write will
2753 # return with a successful (partial) result rather than an EINTR.
2754 # The buffered IO layer must check for pending signal
2755 # handlers, which in this case will invoke alarm_interrupt().
2756 self.assertRaises(ZeroDivisionError,
2757 wio.write, item * (1024 * 1024))
2758 t.join()
2759 # We got one byte, get another one and check that it isn't a
2760 # repeat of the first one.
2761 read_results.append(os.read(r, 1))
2762 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2763 finally:
2764 os.close(w)
2765 os.close(r)
2766 # This is deliberate. If we didn't close the file descriptor
2767 # before closing wio, wio would try to flush its internal
2768 # buffer, and block again.
2769 try:
2770 wio.close()
2771 except IOError as e:
2772 if e.errno != errno.EBADF:
2773 raise
2774
2775 def test_interrupted_write_unbuffered(self):
2776 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2777
2778 def test_interrupted_write_buffered(self):
2779 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2780
2781 def test_interrupted_write_text(self):
2782 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2783
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002784 def check_reentrant_write(self, data, **fdopen_kwargs):
2785 def on_alarm(*args):
2786 # Will be called reentrantly from the same thread
2787 wio.write(data)
2788 1/0
2789 signal.signal(signal.SIGALRM, on_alarm)
2790 r, w = os.pipe()
2791 wio = self.io.open(w, **fdopen_kwargs)
2792 try:
2793 signal.alarm(1)
2794 # Either the reentrant call to wio.write() fails with RuntimeError,
2795 # or the signal handler raises ZeroDivisionError.
2796 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2797 while 1:
2798 for i in range(100):
2799 wio.write(data)
2800 wio.flush()
2801 # Make sure the buffer doesn't fill up and block further writes
2802 os.read(r, len(data) * 100)
2803 exc = cm.exception
2804 if isinstance(exc, RuntimeError):
2805 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2806 finally:
2807 wio.close()
2808 os.close(r)
2809
2810 def test_reentrant_write_buffered(self):
2811 self.check_reentrant_write(b"xy", mode="wb")
2812
2813 def test_reentrant_write_text(self):
2814 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2815
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002816 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2817 """Check that a buffered read, when it gets interrupted (either
2818 returning a partial result or EINTR), properly invokes the signal
2819 handler and retries if the latter returned successfully."""
2820 r, w = os.pipe()
2821 fdopen_kwargs["closefd"] = False
2822 def alarm_handler(sig, frame):
2823 os.write(w, b"bar")
2824 signal.signal(signal.SIGALRM, alarm_handler)
2825 try:
2826 rio = self.io.open(r, **fdopen_kwargs)
2827 os.write(w, b"foo")
2828 signal.alarm(1)
2829 # Expected behaviour:
2830 # - first raw read() returns partial b"foo"
2831 # - second raw read() returns EINTR
2832 # - third raw read() returns b"bar"
2833 self.assertEqual(decode(rio.read(6)), "foobar")
2834 finally:
2835 rio.close()
2836 os.close(w)
2837 os.close(r)
2838
Antoine Pitrou20db5112011-08-19 20:32:34 +02002839 def test_interrupted_read_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002840 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2841 mode="rb")
2842
Antoine Pitrou20db5112011-08-19 20:32:34 +02002843 def test_interrupted_read_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002844 self.check_interrupted_read_retry(lambda x: x,
2845 mode="r")
2846
2847 @unittest.skipUnless(threading, 'Threading required for this test.')
2848 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2849 """Check that a buffered write, when it gets interrupted (either
2850 returning a partial result or EINTR), properly invokes the signal
2851 handler and retries if the latter returned successfully."""
2852 select = support.import_module("select")
2853 # A quantity that exceeds the buffer size of an anonymous pipe's
2854 # write end.
2855 N = 1024 * 1024
2856 r, w = os.pipe()
2857 fdopen_kwargs["closefd"] = False
2858 # We need a separate thread to read from the pipe and allow the
2859 # write() to finish. This thread is started after the SIGALRM is
2860 # received (forcing a first EINTR in write()).
2861 read_results = []
2862 write_finished = False
2863 def _read():
2864 while not write_finished:
2865 while r in select.select([r], [], [], 1.0)[0]:
2866 s = os.read(r, 1024)
2867 read_results.append(s)
2868 t = threading.Thread(target=_read)
2869 t.daemon = True
2870 def alarm1(sig, frame):
2871 signal.signal(signal.SIGALRM, alarm2)
2872 signal.alarm(1)
2873 def alarm2(sig, frame):
2874 t.start()
2875 signal.signal(signal.SIGALRM, alarm1)
2876 try:
2877 wio = self.io.open(w, **fdopen_kwargs)
2878 signal.alarm(1)
2879 # Expected behaviour:
2880 # - first raw write() is partial (because of the limited pipe buffer
2881 # and the first alarm)
2882 # - second raw write() returns EINTR (because of the second alarm)
2883 # - subsequent write()s are successful (either partial or complete)
2884 self.assertEqual(N, wio.write(item * N))
2885 wio.flush()
2886 write_finished = True
2887 t.join()
2888 self.assertEqual(N, sum(len(x) for x in read_results))
2889 finally:
2890 write_finished = True
2891 os.close(w)
2892 os.close(r)
2893 # This is deliberate. If we didn't close the file descriptor
2894 # before closing wio, wio would try to flush its internal
2895 # buffer, and could block (in case of failure).
2896 try:
2897 wio.close()
2898 except IOError as e:
2899 if e.errno != errno.EBADF:
2900 raise
2901
Antoine Pitrou20db5112011-08-19 20:32:34 +02002902 def test_interrupted_write_retry_buffered(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002903 self.check_interrupted_write_retry(b"x", mode="wb")
2904
Antoine Pitrou20db5112011-08-19 20:32:34 +02002905 def test_interrupted_write_retry_text(self):
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002906 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2907
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002908
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002909class CSignalsTest(SignalsTest):
2910 io = io
2911
2912class PySignalsTest(SignalsTest):
2913 io = pyio
2914
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002915 # Handling reentrancy issues would slow down _pyio even more, so the
2916 # tests are disabled.
2917 test_reentrant_write_buffered = None
2918 test_reentrant_write_text = None
2919
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002920
Guido van Rossum28524c72007-02-27 05:47:44 +00002921def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002922 tests = (CIOTest, PyIOTest,
2923 CBufferedReaderTest, PyBufferedReaderTest,
2924 CBufferedWriterTest, PyBufferedWriterTest,
2925 CBufferedRWPairTest, PyBufferedRWPairTest,
2926 CBufferedRandomTest, PyBufferedRandomTest,
2927 StatefulIncrementalDecoderTest,
2928 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2929 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002930 CMiscIOTest, PyMiscIOTest,
2931 CSignalsTest, PySignalsTest,
2932 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002933
2934 # Put the namespaces of the IO module we are testing and some useful mock
2935 # classes in the __dict__ of each test.
2936 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002937 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002938 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2939 c_io_ns = {name : getattr(io, name) for name in all_members}
2940 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2941 globs = globals()
2942 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2943 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2944 # Avoid turning open into a bound method.
2945 py_io_ns["open"] = pyio.OpenWrapper
2946 for test in tests:
2947 if test.__name__.startswith("C"):
2948 for name, obj in c_io_ns.items():
2949 setattr(test, name, obj)
2950 elif test.__name__.startswith("Py"):
2951 for name, obj in py_io_ns.items():
2952 setattr(test, name, obj)
2953
2954 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002955
2956if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002957 test_main()