blob: dac30cbf400ab35ed988e96f4ef1a2240725f289 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
614 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class PyIOTest(IOTest):
617 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000618
Guido van Rossuma9e20242007-03-08 00:43:48 +0000619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620class CommonBufferedTests:
621 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
622
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000623 def test_detach(self):
624 raw = self.MockRawIO()
625 buf = self.tp(raw)
626 self.assertIs(buf.detach(), raw)
627 self.assertRaises(ValueError, buf.detach)
628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 def test_fileno(self):
630 rawio = self.MockRawIO()
631 bufio = self.tp(rawio)
632
Ezio Melottib3aedd42010-11-20 19:04:17 +0000633 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def test_no_fileno(self):
636 # XXX will we always have fileno() function? If so, kill
637 # this test. Else, write it.
638 pass
639
640 def test_invalid_args(self):
641 rawio = self.MockRawIO()
642 bufio = self.tp(rawio)
643 # Invalid whence
644 self.assertRaises(ValueError, bufio.seek, 0, -1)
645 self.assertRaises(ValueError, bufio.seek, 0, 3)
646
647 def test_override_destructor(self):
648 tp = self.tp
649 record = []
650 class MyBufferedIO(tp):
651 def __del__(self):
652 record.append(1)
653 try:
654 f = super().__del__
655 except AttributeError:
656 pass
657 else:
658 f()
659 def close(self):
660 record.append(2)
661 super().close()
662 def flush(self):
663 record.append(3)
664 super().flush()
665 rawio = self.MockRawIO()
666 bufio = MyBufferedIO(rawio)
667 writable = bufio.writable()
668 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000669 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 if writable:
671 self.assertEqual(record, [1, 2, 3])
672 else:
673 self.assertEqual(record, [1, 2])
674
675 def test_context_manager(self):
676 # Test usability as a context manager
677 rawio = self.MockRawIO()
678 bufio = self.tp(rawio)
679 def _with():
680 with bufio:
681 pass
682 _with()
683 # bufio should now be closed, and using it a second time should raise
684 # a ValueError.
685 self.assertRaises(ValueError, _with)
686
687 def test_error_through_destructor(self):
688 # Test that the exception state is not modified by a destructor,
689 # even if close() fails.
690 rawio = self.CloseFailureIO()
691 def f():
692 self.tp(rawio).xyzzy
693 with support.captured_output("stderr") as s:
694 self.assertRaises(AttributeError, f)
695 s = s.getvalue().strip()
696 if s:
697 # The destructor *may* have printed an unraisable error, check it
698 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000699 self.assertTrue(s.startswith("Exception IOError: "), s)
700 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000701
Antoine Pitrou716c4442009-05-23 19:04:03 +0000702 def test_repr(self):
703 raw = self.MockRawIO()
704 b = self.tp(raw)
705 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
706 self.assertEqual(repr(b), "<%s>" % clsname)
707 raw.name = "dummy"
708 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
709 raw.name = b"dummy"
710 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
711
Antoine Pitrou6be88762010-05-03 16:48:20 +0000712 def test_flush_error_on_close(self):
713 raw = self.MockRawIO()
714 def bad_flush():
715 raise IOError()
716 raw.flush = bad_flush
717 b = self.tp(raw)
718 self.assertRaises(IOError, b.close) # exception not swallowed
719
720 def test_multi_close(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 b.close()
724 b.close()
725 b.close()
726 self.assertRaises(ValueError, b.flush)
727
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000728 def test_unseekable(self):
729 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
730 self.assertRaises(self.UnsupportedOperation, bufio.tell)
731 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
732
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000733 def test_readonly_attributes(self):
734 raw = self.MockRawIO()
735 buf = self.tp(raw)
736 x = self.MockRawIO()
737 with self.assertRaises(AttributeError):
738 buf.raw = x
739
Guido van Rossum78892e42007-04-06 17:31:18 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
742 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_constructor(self):
745 rawio = self.MockRawIO([b"abc"])
746 bufio = self.tp(rawio)
747 bufio.__init__(rawio)
748 bufio.__init__(rawio, buffer_size=1024)
749 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
752 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
754 rawio = self.MockRawIO([b"abc"])
755 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000759 for arg in (None, 7):
760 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
761 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000762 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 # Invalid args
764 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766 def test_read1(self):
767 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
768 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000769 self.assertEqual(b"a", bufio.read(1))
770 self.assertEqual(b"b", bufio.read1(1))
771 self.assertEqual(rawio._reads, 1)
772 self.assertEqual(b"c", bufio.read1(100))
773 self.assertEqual(rawio._reads, 1)
774 self.assertEqual(b"d", bufio.read1(100))
775 self.assertEqual(rawio._reads, 2)
776 self.assertEqual(b"efg", bufio.read1(100))
777 self.assertEqual(rawio._reads, 3)
778 self.assertEqual(b"", bufio.read1(100))
779 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 # Invalid args
781 self.assertRaises(ValueError, bufio.read1, -1)
782
783 def test_readinto(self):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
786 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000787 self.assertEqual(bufio.readinto(b), 2)
788 self.assertEqual(b, b"ab")
789 self.assertEqual(bufio.readinto(b), 2)
790 self.assertEqual(b, b"cd")
791 self.assertEqual(bufio.readinto(b), 2)
792 self.assertEqual(b, b"ef")
793 self.assertEqual(bufio.readinto(b), 1)
794 self.assertEqual(b, b"gf")
795 self.assertEqual(bufio.readinto(b), 0)
796 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000797
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000798 def test_readlines(self):
799 def bufio():
800 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
801 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000802 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
803 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
804 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000805
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000806 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000807 data = b"abcdefghi"
808 dlen = len(data)
809
810 tests = [
811 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
812 [ 100, [ 3, 3, 3], [ dlen ] ],
813 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
814 ]
815
816 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 rawio = self.MockFileIO(data)
818 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000819 pos = 0
820 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000821 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000822 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000824 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000827 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000828 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
829 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000830
Ezio Melottib3aedd42010-11-20 19:04:17 +0000831 self.assertEqual(b"abcd", bufio.read(6))
832 self.assertEqual(b"e", bufio.read(1))
833 self.assertEqual(b"fg", bufio.read())
834 self.assertEqual(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000835 self.assertTrue(None is bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000836 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000837
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000838 def test_read_past_eof(self):
839 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
840 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000841
Ezio Melottib3aedd42010-11-20 19:04:17 +0000842 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000844 def test_read_all(self):
845 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
846 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000847
Ezio Melottib3aedd42010-11-20 19:04:17 +0000848 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000849
Victor Stinner45df8202010-04-28 22:31:17 +0000850 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000851 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000852 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000853 try:
854 # Write out many bytes with exactly the same number of 0's,
855 # 1's... 255's. This will help us check that concurrent reading
856 # doesn't duplicate or forget contents.
857 N = 1000
858 l = list(range(256)) * N
859 random.shuffle(l)
860 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000861 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000862 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000863 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000864 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000865 errors = []
866 results = []
867 def f():
868 try:
869 # Intra-buffer read then buffer-flushing read
870 for n in cycle([1, 19]):
871 s = bufio.read(n)
872 if not s:
873 break
874 # list.append() is atomic
875 results.append(s)
876 except Exception as e:
877 errors.append(e)
878 raise
879 threads = [threading.Thread(target=f) for x in range(20)]
880 for t in threads:
881 t.start()
882 time.sleep(0.02) # yield
883 for t in threads:
884 t.join()
885 self.assertFalse(errors,
886 "the following exceptions were caught: %r" % errors)
887 s = b''.join(results)
888 for i in range(256):
889 c = bytes(bytearray([i]))
890 self.assertEqual(s.count(c), N)
891 finally:
892 support.unlink(support.TESTFN)
893
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000894 def test_misbehaved_io(self):
895 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
896 bufio = self.tp(rawio)
897 self.assertRaises(IOError, bufio.seek, 0)
898 self.assertRaises(IOError, bufio.tell)
899
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000900 def test_no_extraneous_read(self):
901 # Issue #9550; when the raw IO object has satisfied the read request,
902 # we should not issue any additional reads, otherwise it may block
903 # (e.g. socket).
904 bufsize = 16
905 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
906 rawio = self.MockRawIO([b"x" * n])
907 bufio = self.tp(rawio, bufsize)
908 self.assertEqual(bufio.read(n), b"x" * n)
909 # Simple case: one raw read is enough to satisfy the request.
910 self.assertEqual(rawio._extraneous_reads, 0,
911 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
912 # A more complex case where two raw reads are needed to satisfy
913 # the request.
914 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
915 bufio = self.tp(rawio, bufsize)
916 self.assertEqual(bufio.read(n), b"x" * n)
917 self.assertEqual(rawio._extraneous_reads, 0,
918 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
919
920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000921class CBufferedReaderTest(BufferedReaderTest):
922 tp = io.BufferedReader
923
924 def test_constructor(self):
925 BufferedReaderTest.test_constructor(self)
926 # The allocation can succeed on 32-bit builds, e.g. with more
927 # than 2GB RAM and a 64-bit kernel.
928 if sys.maxsize > 0x7FFFFFFF:
929 rawio = self.MockRawIO()
930 bufio = self.tp(rawio)
931 self.assertRaises((OverflowError, MemoryError, ValueError),
932 bufio.__init__, rawio, sys.maxsize)
933
934 def test_initialization(self):
935 rawio = self.MockRawIO([b"abc"])
936 bufio = self.tp(rawio)
937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
938 self.assertRaises(ValueError, bufio.read)
939 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
940 self.assertRaises(ValueError, bufio.read)
941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
942 self.assertRaises(ValueError, bufio.read)
943
944 def test_misbehaved_io_read(self):
945 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
946 bufio = self.tp(rawio)
947 # _pyio.BufferedReader seems to implement reading different, so that
948 # checking this is not so easy.
949 self.assertRaises(IOError, bufio.read, 10)
950
951 def test_garbage_collection(self):
952 # C BufferedReader objects are collected.
953 # The Python version has __del__, so it ends into gc.garbage instead
954 rawio = self.FileIO(support.TESTFN, "w+b")
955 f = self.tp(rawio)
956 f.f = f
957 wr = weakref.ref(f)
958 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000959 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000960 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961
962class PyBufferedReaderTest(BufferedReaderTest):
963 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000964
Guido van Rossuma9e20242007-03-08 00:43:48 +0000965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000966class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
967 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 def test_constructor(self):
970 rawio = self.MockRawIO()
971 bufio = self.tp(rawio)
972 bufio.__init__(rawio)
973 bufio.__init__(rawio, buffer_size=1024)
974 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000975 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 bufio.flush()
977 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
978 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
979 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
980 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000981 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000982 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000983 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000985 def test_detach_flush(self):
986 raw = self.MockRawIO()
987 buf = self.tp(raw)
988 buf.write(b"howdy!")
989 self.assertFalse(raw._write_stack)
990 buf.detach()
991 self.assertEqual(raw._write_stack, [b"howdy!"])
992
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000994 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 writer = self.MockRawIO()
996 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000997 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000998 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001000 def test_write_overflow(self):
1001 writer = self.MockRawIO()
1002 bufio = self.tp(writer, 8)
1003 contents = b"abcdefghijklmnop"
1004 for n in range(0, len(contents), 3):
1005 bufio.write(contents[n:n+3])
1006 flushed = b"".join(writer._write_stack)
1007 # At least (total - 8) bytes were implicitly flushed, perhaps more
1008 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001009 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 def check_writes(self, intermediate_func):
1012 # Lots of writes, test the flushed output is as expected.
1013 contents = bytes(range(256)) * 1000
1014 n = 0
1015 writer = self.MockRawIO()
1016 bufio = self.tp(writer, 13)
1017 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1018 def gen_sizes():
1019 for size in count(1):
1020 for i in range(15):
1021 yield size
1022 sizes = gen_sizes()
1023 while n < len(contents):
1024 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001025 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 intermediate_func(bufio)
1027 n += size
1028 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001029 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_writes(self):
1032 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 def test_writes_and_flushes(self):
1035 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_writes_and_seeks(self):
1038 def _seekabs(bufio):
1039 pos = bufio.tell()
1040 bufio.seek(pos + 1, 0)
1041 bufio.seek(pos - 1, 0)
1042 bufio.seek(pos, 0)
1043 self.check_writes(_seekabs)
1044 def _seekrel(bufio):
1045 pos = bufio.seek(0, 1)
1046 bufio.seek(+1, 1)
1047 bufio.seek(-1, 1)
1048 bufio.seek(pos, 0)
1049 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001050
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001051 def test_writes_and_truncates(self):
1052 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_write_non_blocking(self):
1055 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001056 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001057
Ezio Melottib3aedd42010-11-20 19:04:17 +00001058 self.assertEqual(bufio.write(b"abcd"), 4)
1059 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 # 1 byte will be written, the rest will be buffered
1061 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001062 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1065 raw.block_on(b"0")
1066 try:
1067 bufio.write(b"opqrwxyz0123456789")
1068 except self.BlockingIOError as e:
1069 written = e.characters_written
1070 else:
1071 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001072 self.assertEqual(written, 16)
1073 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001075
Ezio Melottib3aedd42010-11-20 19:04:17 +00001076 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 s = raw.pop_written()
1078 # Previously buffered bytes were flushed
1079 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001080
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001081 def test_write_and_rewind(self):
1082 raw = io.BytesIO()
1083 bufio = self.tp(raw, 4)
1084 self.assertEqual(bufio.write(b"abcdef"), 6)
1085 self.assertEqual(bufio.tell(), 6)
1086 bufio.seek(0, 0)
1087 self.assertEqual(bufio.write(b"XY"), 2)
1088 bufio.seek(6, 0)
1089 self.assertEqual(raw.getvalue(), b"XYcdef")
1090 self.assertEqual(bufio.write(b"123456"), 6)
1091 bufio.flush()
1092 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001094 def test_flush(self):
1095 writer = self.MockRawIO()
1096 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001097 bufio.write(b"abc")
1098 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001099 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001100
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001101 def test_destructor(self):
1102 writer = self.MockRawIO()
1103 bufio = self.tp(writer, 8)
1104 bufio.write(b"abc")
1105 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001106 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001107 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001108
1109 def test_truncate(self):
1110 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001111 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112 bufio = self.tp(raw, 8)
1113 bufio.write(b"abcdef")
1114 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001115 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001116 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001117 self.assertEqual(f.read(), b"abc")
1118
Victor Stinner45df8202010-04-28 22:31:17 +00001119 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001120 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001121 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001122 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001123 # Write out many bytes from many threads and test they were
1124 # all flushed.
1125 N = 1000
1126 contents = bytes(range(256)) * N
1127 sizes = cycle([1, 19])
1128 n = 0
1129 queue = deque()
1130 while n < len(contents):
1131 size = next(sizes)
1132 queue.append(contents[n:n+size])
1133 n += size
1134 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001135 # We use a real file object because it allows us to
1136 # exercise situations where the GIL is released before
1137 # writing the buffer to the raw streams. This is in addition
1138 # to concurrency issues due to switching threads in the middle
1139 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001140 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001141 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001142 errors = []
1143 def f():
1144 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001145 while True:
1146 try:
1147 s = queue.popleft()
1148 except IndexError:
1149 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001150 bufio.write(s)
1151 except Exception as e:
1152 errors.append(e)
1153 raise
1154 threads = [threading.Thread(target=f) for x in range(20)]
1155 for t in threads:
1156 t.start()
1157 time.sleep(0.02) # yield
1158 for t in threads:
1159 t.join()
1160 self.assertFalse(errors,
1161 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001162 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001163 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001164 s = f.read()
1165 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001166 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001167 finally:
1168 support.unlink(support.TESTFN)
1169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001170 def test_misbehaved_io(self):
1171 rawio = self.MisbehavedRawIO()
1172 bufio = self.tp(rawio, 5)
1173 self.assertRaises(IOError, bufio.seek, 0)
1174 self.assertRaises(IOError, bufio.tell)
1175 self.assertRaises(IOError, bufio.write, b"abcdef")
1176
Benjamin Peterson59406a92009-03-26 17:10:29 +00001177 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001178 with support.check_warnings(("max_buffer_size is deprecated",
1179 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001180 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001181
1182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001183class CBufferedWriterTest(BufferedWriterTest):
1184 tp = io.BufferedWriter
1185
1186 def test_constructor(self):
1187 BufferedWriterTest.test_constructor(self)
1188 # The allocation can succeed on 32-bit builds, e.g. with more
1189 # than 2GB RAM and a 64-bit kernel.
1190 if sys.maxsize > 0x7FFFFFFF:
1191 rawio = self.MockRawIO()
1192 bufio = self.tp(rawio)
1193 self.assertRaises((OverflowError, MemoryError, ValueError),
1194 bufio.__init__, rawio, sys.maxsize)
1195
1196 def test_initialization(self):
1197 rawio = self.MockRawIO()
1198 bufio = self.tp(rawio)
1199 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1200 self.assertRaises(ValueError, bufio.write, b"def")
1201 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1202 self.assertRaises(ValueError, bufio.write, b"def")
1203 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1204 self.assertRaises(ValueError, bufio.write, b"def")
1205
1206 def test_garbage_collection(self):
1207 # C BufferedWriter objects are collected, and collecting them flushes
1208 # all data to disk.
1209 # The Python version has __del__, so it ends into gc.garbage instead
1210 rawio = self.FileIO(support.TESTFN, "w+b")
1211 f = self.tp(rawio)
1212 f.write(b"123xxx")
1213 f.x = f
1214 wr = weakref.ref(f)
1215 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001216 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001217 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001218 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219 self.assertEqual(f.read(), b"123xxx")
1220
1221
1222class PyBufferedWriterTest(BufferedWriterTest):
1223 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001224
Guido van Rossum01a27522007-03-07 01:00:12 +00001225class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001226
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001227 def test_constructor(self):
1228 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001229 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001230
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001231 def test_detach(self):
1232 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1233 self.assertRaises(self.UnsupportedOperation, pair.detach)
1234
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001235 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001236 with support.check_warnings(("max_buffer_size is deprecated",
1237 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001238 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001239
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001240 def test_constructor_with_not_readable(self):
1241 class NotReadable(MockRawIO):
1242 def readable(self):
1243 return False
1244
1245 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1246
1247 def test_constructor_with_not_writeable(self):
1248 class NotWriteable(MockRawIO):
1249 def writable(self):
1250 return False
1251
1252 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1253
1254 def test_read(self):
1255 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1256
1257 self.assertEqual(pair.read(3), b"abc")
1258 self.assertEqual(pair.read(1), b"d")
1259 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001260 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1261 self.assertEqual(pair.read(None), b"abc")
1262
1263 def test_readlines(self):
1264 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1265 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1266 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1267 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001268
1269 def test_read1(self):
1270 # .read1() is delegated to the underlying reader object, so this test
1271 # can be shallow.
1272 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1273
1274 self.assertEqual(pair.read1(3), b"abc")
1275
1276 def test_readinto(self):
1277 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1278
1279 data = bytearray(5)
1280 self.assertEqual(pair.readinto(data), 5)
1281 self.assertEqual(data, b"abcde")
1282
1283 def test_write(self):
1284 w = self.MockRawIO()
1285 pair = self.tp(self.MockRawIO(), w)
1286
1287 pair.write(b"abc")
1288 pair.flush()
1289 pair.write(b"def")
1290 pair.flush()
1291 self.assertEqual(w._write_stack, [b"abc", b"def"])
1292
1293 def test_peek(self):
1294 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1295
1296 self.assertTrue(pair.peek(3).startswith(b"abc"))
1297 self.assertEqual(pair.read(3), b"abc")
1298
1299 def test_readable(self):
1300 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1301 self.assertTrue(pair.readable())
1302
1303 def test_writeable(self):
1304 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1305 self.assertTrue(pair.writable())
1306
1307 def test_seekable(self):
1308 # BufferedRWPairs are never seekable, even if their readers and writers
1309 # are.
1310 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1311 self.assertFalse(pair.seekable())
1312
1313 # .flush() is delegated to the underlying writer object and has been
1314 # tested in the test_write method.
1315
1316 def test_close_and_closed(self):
1317 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1318 self.assertFalse(pair.closed)
1319 pair.close()
1320 self.assertTrue(pair.closed)
1321
1322 def test_isatty(self):
1323 class SelectableIsAtty(MockRawIO):
1324 def __init__(self, isatty):
1325 MockRawIO.__init__(self)
1326 self._isatty = isatty
1327
1328 def isatty(self):
1329 return self._isatty
1330
1331 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1332 self.assertFalse(pair.isatty())
1333
1334 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1335 self.assertTrue(pair.isatty())
1336
1337 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1338 self.assertTrue(pair.isatty())
1339
1340 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1341 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001342
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001343class CBufferedRWPairTest(BufferedRWPairTest):
1344 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346class PyBufferedRWPairTest(BufferedRWPairTest):
1347 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001348
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001349
1350class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1351 read_mode = "rb+"
1352 write_mode = "wb+"
1353
1354 def test_constructor(self):
1355 BufferedReaderTest.test_constructor(self)
1356 BufferedWriterTest.test_constructor(self)
1357
1358 def test_read_and_write(self):
1359 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001360 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001361
1362 self.assertEqual(b"as", rw.read(2))
1363 rw.write(b"ddd")
1364 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001365 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001366 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001367 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 def test_seek_and_tell(self):
1370 raw = self.BytesIO(b"asdfghjkl")
1371 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001372
Ezio Melottib3aedd42010-11-20 19:04:17 +00001373 self.assertEqual(b"as", rw.read(2))
1374 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001375 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001376 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001377
1378 rw.write(b"asdf")
1379 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001380 self.assertEqual(b"asdfasdfl", rw.read())
1381 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001382 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001383 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001384 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001385 self.assertEqual(7, rw.tell())
1386 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001387 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001388
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001389 def check_flush_and_read(self, read_func):
1390 raw = self.BytesIO(b"abcdefghi")
1391 bufio = self.tp(raw)
1392
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001395 self.assertEqual(b"ef", read_func(bufio, 2))
1396 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(6, bufio.tell())
1399 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 raw.seek(0, 0)
1401 raw.write(b"XYZ")
1402 # flush() resets the read buffer
1403 bufio.flush()
1404 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001405 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001406
1407 def test_flush_and_read(self):
1408 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1409
1410 def test_flush_and_readinto(self):
1411 def _readinto(bufio, n=-1):
1412 b = bytearray(n if n >= 0 else 9999)
1413 n = bufio.readinto(b)
1414 return bytes(b[:n])
1415 self.check_flush_and_read(_readinto)
1416
1417 def test_flush_and_peek(self):
1418 def _peek(bufio, n=-1):
1419 # This relies on the fact that the buffer can contain the whole
1420 # raw stream, otherwise peek() can return less.
1421 b = bufio.peek(n)
1422 if n != -1:
1423 b = b[:n]
1424 bufio.seek(len(b), 1)
1425 return b
1426 self.check_flush_and_read(_peek)
1427
1428 def test_flush_and_write(self):
1429 raw = self.BytesIO(b"abcdefghi")
1430 bufio = self.tp(raw)
1431
1432 bufio.write(b"123")
1433 bufio.flush()
1434 bufio.write(b"45")
1435 bufio.flush()
1436 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(b"12345fghi", raw.getvalue())
1438 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439
1440 def test_threads(self):
1441 BufferedReaderTest.test_threads(self)
1442 BufferedWriterTest.test_threads(self)
1443
1444 def test_writes_and_peek(self):
1445 def _peek(bufio):
1446 bufio.peek(1)
1447 self.check_writes(_peek)
1448 def _peek(bufio):
1449 pos = bufio.tell()
1450 bufio.seek(-1, 1)
1451 bufio.peek(1)
1452 bufio.seek(pos, 0)
1453 self.check_writes(_peek)
1454
1455 def test_writes_and_reads(self):
1456 def _read(bufio):
1457 bufio.seek(-1, 1)
1458 bufio.read(1)
1459 self.check_writes(_read)
1460
1461 def test_writes_and_read1s(self):
1462 def _read1(bufio):
1463 bufio.seek(-1, 1)
1464 bufio.read1(1)
1465 self.check_writes(_read1)
1466
1467 def test_writes_and_readintos(self):
1468 def _read(bufio):
1469 bufio.seek(-1, 1)
1470 bufio.readinto(bytearray(1))
1471 self.check_writes(_read)
1472
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001473 def test_write_after_readahead(self):
1474 # Issue #6629: writing after the buffer was filled by readahead should
1475 # first rewind the raw stream.
1476 for overwrite_size in [1, 5]:
1477 raw = self.BytesIO(b"A" * 10)
1478 bufio = self.tp(raw, 4)
1479 # Trigger readahead
1480 self.assertEqual(bufio.read(1), b"A")
1481 self.assertEqual(bufio.tell(), 1)
1482 # Overwriting should rewind the raw stream if it needs so
1483 bufio.write(b"B" * overwrite_size)
1484 self.assertEqual(bufio.tell(), overwrite_size + 1)
1485 # If the write size was smaller than the buffer size, flush() and
1486 # check that rewind happens.
1487 bufio.flush()
1488 self.assertEqual(bufio.tell(), overwrite_size + 1)
1489 s = raw.getvalue()
1490 self.assertEqual(s,
1491 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1492
Antoine Pitrou7c404892011-05-13 00:13:33 +02001493 def test_write_rewind_write(self):
1494 # Various combinations of reading / writing / seeking backwards / writing again
1495 def mutate(bufio, pos1, pos2):
1496 assert pos2 >= pos1
1497 # Fill the buffer
1498 bufio.seek(pos1)
1499 bufio.read(pos2 - pos1)
1500 bufio.write(b'\x02')
1501 # This writes earlier than the previous write, but still inside
1502 # the buffer.
1503 bufio.seek(pos1)
1504 bufio.write(b'\x01')
1505
1506 b = b"\x80\x81\x82\x83\x84"
1507 for i in range(0, len(b)):
1508 for j in range(i, len(b)):
1509 raw = self.BytesIO(b)
1510 bufio = self.tp(raw, 100)
1511 mutate(bufio, i, j)
1512 bufio.flush()
1513 expected = bytearray(b)
1514 expected[j] = 2
1515 expected[i] = 1
1516 self.assertEqual(raw.getvalue(), expected,
1517 "failed result for i=%d, j=%d" % (i, j))
1518
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001519 def test_truncate_after_read_or_write(self):
1520 raw = self.BytesIO(b"A" * 10)
1521 bufio = self.tp(raw, 100)
1522 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1523 self.assertEqual(bufio.truncate(), 2)
1524 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1525 self.assertEqual(bufio.truncate(), 4)
1526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527 def test_misbehaved_io(self):
1528 BufferedReaderTest.test_misbehaved_io(self)
1529 BufferedWriterTest.test_misbehaved_io(self)
1530
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001531 # You can't construct a BufferedRandom over a non-seekable stream.
1532 test_unseekable = None
1533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534class CBufferedRandomTest(BufferedRandomTest):
1535 tp = io.BufferedRandom
1536
1537 def test_constructor(self):
1538 BufferedRandomTest.test_constructor(self)
1539 # The allocation can succeed on 32-bit builds, e.g. with more
1540 # than 2GB RAM and a 64-bit kernel.
1541 if sys.maxsize > 0x7FFFFFFF:
1542 rawio = self.MockRawIO()
1543 bufio = self.tp(rawio)
1544 self.assertRaises((OverflowError, MemoryError, ValueError),
1545 bufio.__init__, rawio, sys.maxsize)
1546
1547 def test_garbage_collection(self):
1548 CBufferedReaderTest.test_garbage_collection(self)
1549 CBufferedWriterTest.test_garbage_collection(self)
1550
1551class PyBufferedRandomTest(BufferedRandomTest):
1552 tp = pyio.BufferedRandom
1553
1554
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001555# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1556# properties:
1557# - A single output character can correspond to many bytes of input.
1558# - The number of input bytes to complete the character can be
1559# undetermined until the last input byte is received.
1560# - The number of input bytes can vary depending on previous input.
1561# - A single input byte can correspond to many characters of output.
1562# - The number of output characters can be undetermined until the
1563# last input byte is received.
1564# - The number of output characters can vary depending on previous input.
1565
1566class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1567 """
1568 For testing seek/tell behavior with a stateful, buffering decoder.
1569
1570 Input is a sequence of words. Words may be fixed-length (length set
1571 by input) or variable-length (period-terminated). In variable-length
1572 mode, extra periods are ignored. Possible words are:
1573 - 'i' followed by a number sets the input length, I (maximum 99).
1574 When I is set to 0, words are space-terminated.
1575 - 'o' followed by a number sets the output length, O (maximum 99).
1576 - Any other word is converted into a word followed by a period on
1577 the output. The output word consists of the input word truncated
1578 or padded out with hyphens to make its length equal to O. If O
1579 is 0, the word is output verbatim without truncating or padding.
1580 I and O are initially set to 1. When I changes, any buffered input is
1581 re-scanned according to the new I. EOF also terminates the last word.
1582 """
1583
1584 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001585 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001586 self.reset()
1587
1588 def __repr__(self):
1589 return '<SID %x>' % id(self)
1590
1591 def reset(self):
1592 self.i = 1
1593 self.o = 1
1594 self.buffer = bytearray()
1595
1596 def getstate(self):
1597 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1598 return bytes(self.buffer), i*100 + o
1599
1600 def setstate(self, state):
1601 buffer, io = state
1602 self.buffer = bytearray(buffer)
1603 i, o = divmod(io, 100)
1604 self.i, self.o = i ^ 1, o ^ 1
1605
1606 def decode(self, input, final=False):
1607 output = ''
1608 for b in input:
1609 if self.i == 0: # variable-length, terminated with period
1610 if b == ord('.'):
1611 if self.buffer:
1612 output += self.process_word()
1613 else:
1614 self.buffer.append(b)
1615 else: # fixed-length, terminate after self.i bytes
1616 self.buffer.append(b)
1617 if len(self.buffer) == self.i:
1618 output += self.process_word()
1619 if final and self.buffer: # EOF terminates the last word
1620 output += self.process_word()
1621 return output
1622
1623 def process_word(self):
1624 output = ''
1625 if self.buffer[0] == ord('i'):
1626 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1627 elif self.buffer[0] == ord('o'):
1628 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1629 else:
1630 output = self.buffer.decode('ascii')
1631 if len(output) < self.o:
1632 output += '-'*self.o # pad out with hyphens
1633 if self.o:
1634 output = output[:self.o] # truncate to output length
1635 output += '.'
1636 self.buffer = bytearray()
1637 return output
1638
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001639 codecEnabled = False
1640
1641 @classmethod
1642 def lookupTestDecoder(cls, name):
1643 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001644 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001645 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001646 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001647 incrementalencoder=None,
1648 streamreader=None, streamwriter=None,
1649 incrementaldecoder=cls)
1650
1651# Register the previous decoder for testing.
1652# Disabled by default, tests will enable it.
1653codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1654
1655
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001656class StatefulIncrementalDecoderTest(unittest.TestCase):
1657 """
1658 Make sure the StatefulIncrementalDecoder actually works.
1659 """
1660
1661 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001662 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001663 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001664 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001665 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001666 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001667 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001668 # I=0, O=6 (variable-length input, fixed-length output)
1669 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1670 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001671 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001672 # I=6, O=3 (fixed-length input > fixed-length output)
1673 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1674 # I=0, then 3; O=29, then 15 (with longer output)
1675 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1676 'a----------------------------.' +
1677 'b----------------------------.' +
1678 'cde--------------------------.' +
1679 'abcdefghijabcde.' +
1680 'a.b------------.' +
1681 '.c.------------.' +
1682 'd.e------------.' +
1683 'k--------------.' +
1684 'l--------------.' +
1685 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001686 ]
1687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001689 # Try a few one-shot test cases.
1690 for input, eof, output in self.test_cases:
1691 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001692 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001693
1694 # Also test an unfinished decode, followed by forcing EOF.
1695 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001696 self.assertEqual(d.decode(b'oiabcd'), '')
1697 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001698
1699class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001700
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001701 def setUp(self):
1702 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1703 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001704 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001705
Guido van Rossumd0712812007-04-11 16:32:43 +00001706 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001707 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 def test_constructor(self):
1710 r = self.BytesIO(b"\xc3\xa9\n\n")
1711 b = self.BufferedReader(r, 1000)
1712 t = self.TextIOWrapper(b)
1713 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001714 self.assertEqual(t.encoding, "latin1")
1715 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001716 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001717 self.assertEqual(t.encoding, "utf8")
1718 self.assertEqual(t.line_buffering, True)
1719 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 self.assertRaises(TypeError, t.__init__, b, newline=42)
1721 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1722
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001723 def test_detach(self):
1724 r = self.BytesIO()
1725 b = self.BufferedWriter(r)
1726 t = self.TextIOWrapper(b)
1727 self.assertIs(t.detach(), b)
1728
1729 t = self.TextIOWrapper(b, encoding="ascii")
1730 t.write("howdy")
1731 self.assertFalse(r.getvalue())
1732 t.detach()
1733 self.assertEqual(r.getvalue(), b"howdy")
1734 self.assertRaises(ValueError, t.detach)
1735
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001736 def test_repr(self):
1737 raw = self.BytesIO("hello".encode("utf-8"))
1738 b = self.BufferedReader(raw)
1739 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001740 modname = self.TextIOWrapper.__module__
1741 self.assertEqual(repr(t),
1742 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1743 raw.name = "dummy"
1744 self.assertEqual(repr(t),
1745 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001746 t.mode = "r"
1747 self.assertEqual(repr(t),
1748 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001749 raw.name = b"dummy"
1750 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001751 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 def test_line_buffering(self):
1754 r = self.BytesIO()
1755 b = self.BufferedWriter(r, 1000)
1756 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001757 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001758 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001759 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001760 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001761 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001762 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001763
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 def test_encoding(self):
1765 # Check the encoding attribute is always set, and valid
1766 b = self.BytesIO()
1767 t = self.TextIOWrapper(b, encoding="utf8")
1768 self.assertEqual(t.encoding, "utf8")
1769 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001770 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001771 codecs.lookup(t.encoding)
1772
1773 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001774 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 b = self.BytesIO(b"abc\n\xff\n")
1776 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001777 self.assertRaises(UnicodeError, t.read)
1778 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779 b = self.BytesIO(b"abc\n\xff\n")
1780 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001781 self.assertRaises(UnicodeError, t.read)
1782 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001783 b = self.BytesIO(b"abc\n\xff\n")
1784 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001785 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001786 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 b = self.BytesIO(b"abc\n\xff\n")
1788 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001789 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001790
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001792 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 b = self.BytesIO()
1794 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001795 self.assertRaises(UnicodeError, t.write, "\xff")
1796 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797 b = self.BytesIO()
1798 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001799 self.assertRaises(UnicodeError, t.write, "\xff")
1800 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 b = self.BytesIO()
1802 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001803 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001804 t.write("abc\xffdef\n")
1805 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001806 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001807 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 b = self.BytesIO()
1809 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001810 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001811 t.write("abc\xffdef\n")
1812 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001813 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001814
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001816 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1817
1818 tests = [
1819 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001820 [ '', input_lines ],
1821 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1822 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1823 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001824 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001825 encodings = (
1826 'utf-8', 'latin-1',
1827 'utf-16', 'utf-16-le', 'utf-16-be',
1828 'utf-32', 'utf-32-le', 'utf-32-be',
1829 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001830
Guido van Rossum8358db22007-08-18 21:39:55 +00001831 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001832 # character in TextIOWrapper._pending_line.
1833 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001834 # XXX: str.encode() should return bytes
1835 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001836 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001837 for bufsize in range(1, 10):
1838 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1840 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001841 encoding=encoding)
1842 if do_reads:
1843 got_lines = []
1844 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001845 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001846 if c2 == '':
1847 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001848 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001849 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001850 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001851 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001852
1853 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001854 self.assertEqual(got_line, exp_line)
1855 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001856
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001857 def test_newlines_input(self):
1858 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001859 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1860 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001861 (None, normalized.decode("ascii").splitlines(True)),
1862 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1864 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1865 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001866 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001867 buf = self.BytesIO(testdata)
1868 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001869 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001870 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001871 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 def test_newlines_output(self):
1874 testdict = {
1875 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1876 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1877 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1878 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1879 }
1880 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1881 for newline, expected in tests:
1882 buf = self.BytesIO()
1883 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1884 txt.write("AAA\nB")
1885 txt.write("BB\nCCC\n")
1886 txt.write("X\rY\r\nZ")
1887 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001888 self.assertEqual(buf.closed, False)
1889 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890
1891 def test_destructor(self):
1892 l = []
1893 base = self.BytesIO
1894 class MyBytesIO(base):
1895 def close(self):
1896 l.append(self.getvalue())
1897 base.close(self)
1898 b = MyBytesIO()
1899 t = self.TextIOWrapper(b, encoding="ascii")
1900 t.write("abc")
1901 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001902 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001903 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904
1905 def test_override_destructor(self):
1906 record = []
1907 class MyTextIO(self.TextIOWrapper):
1908 def __del__(self):
1909 record.append(1)
1910 try:
1911 f = super().__del__
1912 except AttributeError:
1913 pass
1914 else:
1915 f()
1916 def close(self):
1917 record.append(2)
1918 super().close()
1919 def flush(self):
1920 record.append(3)
1921 super().flush()
1922 b = self.BytesIO()
1923 t = MyTextIO(b, encoding="ascii")
1924 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001925 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926 self.assertEqual(record, [1, 2, 3])
1927
1928 def test_error_through_destructor(self):
1929 # Test that the exception state is not modified by a destructor,
1930 # even if close() fails.
1931 rawio = self.CloseFailureIO()
1932 def f():
1933 self.TextIOWrapper(rawio).xyzzy
1934 with support.captured_output("stderr") as s:
1935 self.assertRaises(AttributeError, f)
1936 s = s.getvalue().strip()
1937 if s:
1938 # The destructor *may* have printed an unraisable error, check it
1939 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001940 self.assertTrue(s.startswith("Exception IOError: "), s)
1941 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001942
Guido van Rossum9b76da62007-04-11 01:09:03 +00001943 # Systematic tests of the text I/O API
1944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1947 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001949 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001950 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001951 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001953 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001954 self.assertEqual(f.tell(), 0)
1955 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001956 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001957 self.assertEqual(f.seek(0), 0)
1958 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001959 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001960 self.assertEqual(f.read(2), "ab")
1961 self.assertEqual(f.read(1), "c")
1962 self.assertEqual(f.read(1), "")
1963 self.assertEqual(f.read(), "")
1964 self.assertEqual(f.tell(), cookie)
1965 self.assertEqual(f.seek(0), 0)
1966 self.assertEqual(f.seek(0, 2), cookie)
1967 self.assertEqual(f.write("def"), 3)
1968 self.assertEqual(f.seek(cookie), cookie)
1969 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001970 if enc.startswith("utf"):
1971 self.multi_line_test(f, enc)
1972 f.close()
1973
1974 def multi_line_test(self, f, enc):
1975 f.seek(0)
1976 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001977 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001978 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001979 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001980 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001981 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001982 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001983 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001984 wlines.append((f.tell(), line))
1985 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001986 f.seek(0)
1987 rlines = []
1988 while True:
1989 pos = f.tell()
1990 line = f.readline()
1991 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001992 break
1993 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001994 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 def test_telling(self):
1997 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001998 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001999 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002000 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002001 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002002 p2 = f.tell()
2003 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002004 self.assertEqual(f.tell(), p0)
2005 self.assertEqual(f.readline(), "\xff\n")
2006 self.assertEqual(f.tell(), p1)
2007 self.assertEqual(f.readline(), "\xff\n")
2008 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002009 f.seek(0)
2010 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002011 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002012 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002013 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002014 f.close()
2015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002016 def test_seeking(self):
2017 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002018 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002019 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002020 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002021 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002022 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002023 suffix = bytes(u_suffix.encode("utf-8"))
2024 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002025 with self.open(support.TESTFN, "wb") as f:
2026 f.write(line*2)
2027 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2028 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002029 self.assertEqual(s, str(prefix, "ascii"))
2030 self.assertEqual(f.tell(), prefix_size)
2031 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002034 # Regression test for a specific bug
2035 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002036 with self.open(support.TESTFN, "wb") as f:
2037 f.write(data)
2038 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2039 f._CHUNK_SIZE # Just test that it exists
2040 f._CHUNK_SIZE = 2
2041 f.readline()
2042 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044 def test_seek_and_tell(self):
2045 #Test seek/tell using the StatefulIncrementalDecoder.
2046 # Make test faster by doing smaller seeks
2047 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002048
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002049 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002050 """Tell/seek to various points within a data stream and ensure
2051 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002053 f.write(data)
2054 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 f = self.open(support.TESTFN, encoding='test_decoder')
2056 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002057 decoded = f.read()
2058 f.close()
2059
Neal Norwitze2b07052008-03-18 19:52:05 +00002060 for i in range(min_pos, len(decoded) + 1): # seek positions
2061 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002062 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002063 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002064 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002065 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002066 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002067 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002068 f.close()
2069
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002070 # Enable the test decoder.
2071 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002072
2073 # Run the tests.
2074 try:
2075 # Try each test case.
2076 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002077 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002078
2079 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002080 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2081 offset = CHUNK_SIZE - len(input)//2
2082 prefix = b'.'*offset
2083 # Don't bother seeking into the prefix (takes too long).
2084 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002085 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002086
2087 # Ensure our test decoder won't interfere with subsequent tests.
2088 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002089 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002092 data = "1234567890"
2093 tests = ("utf-16",
2094 "utf-16-le",
2095 "utf-16-be",
2096 "utf-32",
2097 "utf-32-le",
2098 "utf-32-be")
2099 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002100 buf = self.BytesIO()
2101 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002102 # Check if the BOM is written only once (see issue1753).
2103 f.write(data)
2104 f.write(data)
2105 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002106 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002107 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002108 self.assertEqual(f.read(), data * 2)
2109 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002110
Benjamin Petersona1b49012009-03-31 23:11:32 +00002111 def test_unreadable(self):
2112 class UnReadable(self.BytesIO):
2113 def readable(self):
2114 return False
2115 txt = self.TextIOWrapper(UnReadable())
2116 self.assertRaises(IOError, txt.read)
2117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002118 def test_read_one_by_one(self):
2119 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002120 reads = ""
2121 while True:
2122 c = txt.read(1)
2123 if not c:
2124 break
2125 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002126 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002127
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002128 def test_readlines(self):
2129 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2130 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2131 txt.seek(0)
2132 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2133 txt.seek(0)
2134 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2135
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002136 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002137 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002138 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002140 reads = ""
2141 while True:
2142 c = txt.read(128)
2143 if not c:
2144 break
2145 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002146 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002147
2148 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002149 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002150
2151 # read one char at a time
2152 reads = ""
2153 while True:
2154 c = txt.read(1)
2155 if not c:
2156 break
2157 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002159
2160 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002162 txt._CHUNK_SIZE = 4
2163
2164 reads = ""
2165 while True:
2166 c = txt.read(4)
2167 if not c:
2168 break
2169 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002171
2172 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002174 txt._CHUNK_SIZE = 4
2175
2176 reads = txt.read(4)
2177 reads += txt.read(4)
2178 reads += txt.readline()
2179 reads += txt.readline()
2180 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002181 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002182
2183 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002185 txt._CHUNK_SIZE = 4
2186
2187 reads = txt.read(4)
2188 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002189 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002190
2191 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002192 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002193 txt._CHUNK_SIZE = 4
2194
2195 reads = txt.read(4)
2196 pos = txt.tell()
2197 txt.seek(0)
2198 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002199 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002200
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002201 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002202 buffer = self.BytesIO(self.testdata)
2203 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002204
2205 self.assertEqual(buffer.seekable(), txt.seekable())
2206
Antoine Pitroue4501852009-05-14 18:55:55 +00002207 def test_append_bom(self):
2208 # The BOM is not written again when appending to a non-empty file
2209 filename = support.TESTFN
2210 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2211 with self.open(filename, 'w', encoding=charset) as f:
2212 f.write('aaa')
2213 pos = f.tell()
2214 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002215 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002216
2217 with self.open(filename, 'a', encoding=charset) as f:
2218 f.write('xxx')
2219 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002221
2222 def test_seek_bom(self):
2223 # Same test, but when seeking manually
2224 filename = support.TESTFN
2225 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2226 with self.open(filename, 'w', encoding=charset) as f:
2227 f.write('aaa')
2228 pos = f.tell()
2229 with self.open(filename, 'r+', encoding=charset) as f:
2230 f.seek(pos)
2231 f.write('zzz')
2232 f.seek(0)
2233 f.write('bbb')
2234 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002235 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002236
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002237 def test_errors_property(self):
2238 with self.open(support.TESTFN, "w") as f:
2239 self.assertEqual(f.errors, "strict")
2240 with self.open(support.TESTFN, "w", errors="replace") as f:
2241 self.assertEqual(f.errors, "replace")
2242
Victor Stinner45df8202010-04-28 22:31:17 +00002243 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002244 def test_threads_write(self):
2245 # Issue6750: concurrent writes could duplicate data
2246 event = threading.Event()
2247 with self.open(support.TESTFN, "w", buffering=1) as f:
2248 def run(n):
2249 text = "Thread%03d\n" % n
2250 event.wait()
2251 f.write(text)
2252 threads = [threading.Thread(target=lambda n=x: run(n))
2253 for x in range(20)]
2254 for t in threads:
2255 t.start()
2256 time.sleep(0.02)
2257 event.set()
2258 for t in threads:
2259 t.join()
2260 with self.open(support.TESTFN) as f:
2261 content = f.read()
2262 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002263 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002264
Antoine Pitrou6be88762010-05-03 16:48:20 +00002265 def test_flush_error_on_close(self):
2266 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2267 def bad_flush():
2268 raise IOError()
2269 txt.flush = bad_flush
2270 self.assertRaises(IOError, txt.close) # exception not swallowed
2271
2272 def test_multi_close(self):
2273 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2274 txt.close()
2275 txt.close()
2276 txt.close()
2277 self.assertRaises(ValueError, txt.flush)
2278
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002279 def test_unseekable(self):
2280 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2281 self.assertRaises(self.UnsupportedOperation, txt.tell)
2282 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2283
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002284 def test_readonly_attributes(self):
2285 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2286 buf = self.BytesIO(self.testdata)
2287 with self.assertRaises(AttributeError):
2288 txt.buffer = buf
2289
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002290class CTextIOWrapperTest(TextIOWrapperTest):
2291
2292 def test_initialization(self):
2293 r = self.BytesIO(b"\xc3\xa9\n\n")
2294 b = self.BufferedReader(r, 1000)
2295 t = self.TextIOWrapper(b)
2296 self.assertRaises(TypeError, t.__init__, b, newline=42)
2297 self.assertRaises(ValueError, t.read)
2298 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2299 self.assertRaises(ValueError, t.read)
2300
2301 def test_garbage_collection(self):
2302 # C TextIOWrapper objects are collected, and collecting them flushes
2303 # all data to disk.
2304 # The Python version has __del__, so it ends in gc.garbage instead.
2305 rawio = io.FileIO(support.TESTFN, "wb")
2306 b = self.BufferedWriter(rawio)
2307 t = self.TextIOWrapper(b, encoding="ascii")
2308 t.write("456def")
2309 t.x = t
2310 wr = weakref.ref(t)
2311 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002312 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002313 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002314 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002315 self.assertEqual(f.read(), b"456def")
2316
2317class PyTextIOWrapperTest(TextIOWrapperTest):
2318 pass
2319
2320
2321class IncrementalNewlineDecoderTest(unittest.TestCase):
2322
2323 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002324 # UTF-8 specific tests for a newline decoder
2325 def _check_decode(b, s, **kwargs):
2326 # We exercise getstate() / setstate() as well as decode()
2327 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002328 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002329 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002330 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002331
Antoine Pitrou180a3362008-12-14 16:36:46 +00002332 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002333
Antoine Pitrou180a3362008-12-14 16:36:46 +00002334 _check_decode(b'\xe8', "")
2335 _check_decode(b'\xa2', "")
2336 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002337
Antoine Pitrou180a3362008-12-14 16:36:46 +00002338 _check_decode(b'\xe8', "")
2339 _check_decode(b'\xa2', "")
2340 _check_decode(b'\x88', "\u8888")
2341
2342 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002343 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2344
Antoine Pitrou180a3362008-12-14 16:36:46 +00002345 decoder.reset()
2346 _check_decode(b'\n', "\n")
2347 _check_decode(b'\r', "")
2348 _check_decode(b'', "\n", final=True)
2349 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002350
Antoine Pitrou180a3362008-12-14 16:36:46 +00002351 _check_decode(b'\r', "")
2352 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002353
Antoine Pitrou180a3362008-12-14 16:36:46 +00002354 _check_decode(b'\r\r\n', "\n\n")
2355 _check_decode(b'\r', "")
2356 _check_decode(b'\r', "\n")
2357 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002358
Antoine Pitrou180a3362008-12-14 16:36:46 +00002359 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2360 _check_decode(b'\xe8\xa2\x88', "\u8888")
2361 _check_decode(b'\n', "\n")
2362 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2363 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002366 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 if encoding is not None:
2368 encoder = codecs.getincrementalencoder(encoding)()
2369 def _decode_bytewise(s):
2370 # Decode one byte at a time
2371 for b in encoder.encode(s):
2372 result.append(decoder.decode(bytes([b])))
2373 else:
2374 encoder = None
2375 def _decode_bytewise(s):
2376 # Decode one char at a time
2377 for c in s:
2378 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002379 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002380 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002381 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002382 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002383 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002384 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002385 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002386 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002387 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002388 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002389 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002390 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002391 input = "abc"
2392 if encoder is not None:
2393 encoder.reset()
2394 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002395 self.assertEqual(decoder.decode(input), "abc")
2396 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002397
2398 def test_newline_decoder(self):
2399 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002400 # None meaning the IncrementalNewlineDecoder takes unicode input
2401 # rather than bytes input
2402 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002403 'utf-16', 'utf-16-le', 'utf-16-be',
2404 'utf-32', 'utf-32-le', 'utf-32-be',
2405 )
2406 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002407 decoder = enc and codecs.getincrementaldecoder(enc)()
2408 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2409 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002410 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2412 self.check_newline_decoding_utf8(decoder)
2413
Antoine Pitrou66913e22009-03-06 23:40:56 +00002414 def test_newline_bytes(self):
2415 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2416 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002417 self.assertEqual(dec.newlines, None)
2418 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2419 self.assertEqual(dec.newlines, None)
2420 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2421 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002422 dec = self.IncrementalNewlineDecoder(None, translate=False)
2423 _check(dec)
2424 dec = self.IncrementalNewlineDecoder(None, translate=True)
2425 _check(dec)
2426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2428 pass
2429
2430class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2431 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002432
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002433
Guido van Rossum01a27522007-03-07 01:00:12 +00002434# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002435
Guido van Rossum5abbf752007-08-27 17:39:33 +00002436class MiscIOTest(unittest.TestCase):
2437
Barry Warsaw40e82462008-11-20 20:14:50 +00002438 def tearDown(self):
2439 support.unlink(support.TESTFN)
2440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002441 def test___all__(self):
2442 for name in self.io.__all__:
2443 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002444 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002445 if name == "open":
2446 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002447 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002448 self.assertTrue(issubclass(obj, Exception), name)
2449 elif not name.startswith("SEEK_"):
2450 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002451
Barry Warsaw40e82462008-11-20 20:14:50 +00002452 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002453 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002454 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002455 f.close()
2456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002457 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002458 self.assertEqual(f.name, support.TESTFN)
2459 self.assertEqual(f.buffer.name, support.TESTFN)
2460 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2461 self.assertEqual(f.mode, "U")
2462 self.assertEqual(f.buffer.mode, "rb")
2463 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002464 f.close()
2465
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002466 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002467 self.assertEqual(f.mode, "w+")
2468 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2469 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002470
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002471 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002472 self.assertEqual(g.mode, "wb")
2473 self.assertEqual(g.raw.mode, "wb")
2474 self.assertEqual(g.name, f.fileno())
2475 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002476 f.close()
2477 g.close()
2478
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002479 def test_io_after_close(self):
2480 for kwargs in [
2481 {"mode": "w"},
2482 {"mode": "wb"},
2483 {"mode": "w", "buffering": 1},
2484 {"mode": "w", "buffering": 2},
2485 {"mode": "wb", "buffering": 0},
2486 {"mode": "r"},
2487 {"mode": "rb"},
2488 {"mode": "r", "buffering": 1},
2489 {"mode": "r", "buffering": 2},
2490 {"mode": "rb", "buffering": 0},
2491 {"mode": "w+"},
2492 {"mode": "w+b"},
2493 {"mode": "w+", "buffering": 1},
2494 {"mode": "w+", "buffering": 2},
2495 {"mode": "w+b", "buffering": 0},
2496 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002497 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002498 f.close()
2499 self.assertRaises(ValueError, f.flush)
2500 self.assertRaises(ValueError, f.fileno)
2501 self.assertRaises(ValueError, f.isatty)
2502 self.assertRaises(ValueError, f.__iter__)
2503 if hasattr(f, "peek"):
2504 self.assertRaises(ValueError, f.peek, 1)
2505 self.assertRaises(ValueError, f.read)
2506 if hasattr(f, "read1"):
2507 self.assertRaises(ValueError, f.read1, 1024)
2508 if hasattr(f, "readinto"):
2509 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2510 self.assertRaises(ValueError, f.readline)
2511 self.assertRaises(ValueError, f.readlines)
2512 self.assertRaises(ValueError, f.seek, 0)
2513 self.assertRaises(ValueError, f.tell)
2514 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002515 self.assertRaises(ValueError, f.write,
2516 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002517 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002518 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002519
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 def test_blockingioerror(self):
2521 # Various BlockingIOError issues
2522 self.assertRaises(TypeError, self.BlockingIOError)
2523 self.assertRaises(TypeError, self.BlockingIOError, 1)
2524 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2525 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2526 b = self.BlockingIOError(1, "")
2527 self.assertEqual(b.characters_written, 0)
2528 class C(str):
2529 pass
2530 c = C("")
2531 b = self.BlockingIOError(1, c)
2532 c.b = b
2533 b.c = c
2534 wr = weakref.ref(c)
2535 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002536 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002537 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538
2539 def test_abcs(self):
2540 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002541 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2542 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2543 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2544 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002545
2546 def _check_abc_inheritance(self, abcmodule):
2547 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002548 self.assertIsInstance(f, abcmodule.IOBase)
2549 self.assertIsInstance(f, abcmodule.RawIOBase)
2550 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2551 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002553 self.assertIsInstance(f, abcmodule.IOBase)
2554 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2555 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2556 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002558 self.assertIsInstance(f, abcmodule.IOBase)
2559 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2560 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2561 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002562
2563 def test_abc_inheritance(self):
2564 # Test implementations inherit from their respective ABCs
2565 self._check_abc_inheritance(self)
2566
2567 def test_abc_inheritance_official(self):
2568 # Test implementations inherit from the official ABCs of the
2569 # baseline "io" module.
2570 self._check_abc_inheritance(io)
2571
Antoine Pitroue033e062010-10-29 10:38:18 +00002572 def _check_warn_on_dealloc(self, *args, **kwargs):
2573 f = open(*args, **kwargs)
2574 r = repr(f)
2575 with self.assertWarns(ResourceWarning) as cm:
2576 f = None
2577 support.gc_collect()
2578 self.assertIn(r, str(cm.warning.args[0]))
2579
2580 def test_warn_on_dealloc(self):
2581 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2582 self._check_warn_on_dealloc(support.TESTFN, "wb")
2583 self._check_warn_on_dealloc(support.TESTFN, "w")
2584
2585 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2586 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002587 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002588 for fd in fds:
2589 try:
2590 os.close(fd)
2591 except EnvironmentError as e:
2592 if e.errno != errno.EBADF:
2593 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002594 self.addCleanup(cleanup_fds)
2595 r, w = os.pipe()
2596 fds += r, w
2597 self._check_warn_on_dealloc(r, *args, **kwargs)
2598 # When using closefd=False, there's no warning
2599 r, w = os.pipe()
2600 fds += r, w
2601 with warnings.catch_warnings(record=True) as recorded:
2602 open(r, *args, closefd=False, **kwargs)
2603 support.gc_collect()
2604 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002605
2606 def test_warn_on_dealloc_fd(self):
2607 self._check_warn_on_dealloc_fd("rb", buffering=0)
2608 self._check_warn_on_dealloc_fd("rb")
2609 self._check_warn_on_dealloc_fd("r")
2610
2611
Antoine Pitrou243757e2010-11-05 21:15:39 +00002612 def test_pickling(self):
2613 # Pickling file objects is forbidden
2614 for kwargs in [
2615 {"mode": "w"},
2616 {"mode": "wb"},
2617 {"mode": "wb", "buffering": 0},
2618 {"mode": "r"},
2619 {"mode": "rb"},
2620 {"mode": "rb", "buffering": 0},
2621 {"mode": "w+"},
2622 {"mode": "w+b"},
2623 {"mode": "w+b", "buffering": 0},
2624 ]:
2625 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2626 with self.open(support.TESTFN, **kwargs) as f:
2627 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002629class CMiscIOTest(MiscIOTest):
2630 io = io
2631
2632class PyMiscIOTest(MiscIOTest):
2633 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002634
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002635
2636@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2637class SignalsTest(unittest.TestCase):
2638
2639 def setUp(self):
2640 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2641
2642 def tearDown(self):
2643 signal.signal(signal.SIGALRM, self.oldalrm)
2644
2645 def alarm_interrupt(self, sig, frame):
2646 1/0
2647
2648 @unittest.skipUnless(threading, 'Threading required for this test.')
2649 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2650 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002651 invokes the signal handler, and bubbles up the exception raised
2652 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002653 read_results = []
2654 def _read():
2655 s = os.read(r, 1)
2656 read_results.append(s)
2657 t = threading.Thread(target=_read)
2658 t.daemon = True
2659 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002660 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002661 try:
2662 wio = self.io.open(w, **fdopen_kwargs)
2663 t.start()
2664 signal.alarm(1)
2665 # Fill the pipe enough that the write will be blocking.
2666 # It will be interrupted by the timer armed above. Since the
2667 # other thread has read one byte, the low-level write will
2668 # return with a successful (partial) result rather than an EINTR.
2669 # The buffered IO layer must check for pending signal
2670 # handlers, which in this case will invoke alarm_interrupt().
2671 self.assertRaises(ZeroDivisionError,
2672 wio.write, item * (1024 * 1024))
2673 t.join()
2674 # We got one byte, get another one and check that it isn't a
2675 # repeat of the first one.
2676 read_results.append(os.read(r, 1))
2677 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2678 finally:
2679 os.close(w)
2680 os.close(r)
2681 # This is deliberate. If we didn't close the file descriptor
2682 # before closing wio, wio would try to flush its internal
2683 # buffer, and block again.
2684 try:
2685 wio.close()
2686 except IOError as e:
2687 if e.errno != errno.EBADF:
2688 raise
2689
2690 def test_interrupted_write_unbuffered(self):
2691 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2692
2693 def test_interrupted_write_buffered(self):
2694 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2695
2696 def test_interrupted_write_text(self):
2697 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2698
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002699 def check_reentrant_write(self, data, **fdopen_kwargs):
2700 def on_alarm(*args):
2701 # Will be called reentrantly from the same thread
2702 wio.write(data)
2703 1/0
2704 signal.signal(signal.SIGALRM, on_alarm)
2705 r, w = os.pipe()
2706 wio = self.io.open(w, **fdopen_kwargs)
2707 try:
2708 signal.alarm(1)
2709 # Either the reentrant call to wio.write() fails with RuntimeError,
2710 # or the signal handler raises ZeroDivisionError.
2711 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2712 while 1:
2713 for i in range(100):
2714 wio.write(data)
2715 wio.flush()
2716 # Make sure the buffer doesn't fill up and block further writes
2717 os.read(r, len(data) * 100)
2718 exc = cm.exception
2719 if isinstance(exc, RuntimeError):
2720 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2721 finally:
2722 wio.close()
2723 os.close(r)
2724
2725 def test_reentrant_write_buffered(self):
2726 self.check_reentrant_write(b"xy", mode="wb")
2727
2728 def test_reentrant_write_text(self):
2729 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2730
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002731 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2732 """Check that a buffered read, when it gets interrupted (either
2733 returning a partial result or EINTR), properly invokes the signal
2734 handler and retries if the latter returned successfully."""
2735 r, w = os.pipe()
2736 fdopen_kwargs["closefd"] = False
2737 def alarm_handler(sig, frame):
2738 os.write(w, b"bar")
2739 signal.signal(signal.SIGALRM, alarm_handler)
2740 try:
2741 rio = self.io.open(r, **fdopen_kwargs)
2742 os.write(w, b"foo")
2743 signal.alarm(1)
2744 # Expected behaviour:
2745 # - first raw read() returns partial b"foo"
2746 # - second raw read() returns EINTR
2747 # - third raw read() returns b"bar"
2748 self.assertEqual(decode(rio.read(6)), "foobar")
2749 finally:
2750 rio.close()
2751 os.close(w)
2752 os.close(r)
2753
2754 def test_interrupterd_read_retry_buffered(self):
2755 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2756 mode="rb")
2757
2758 def test_interrupterd_read_retry_text(self):
2759 self.check_interrupted_read_retry(lambda x: x,
2760 mode="r")
2761
2762 @unittest.skipUnless(threading, 'Threading required for this test.')
2763 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2764 """Check that a buffered write, when it gets interrupted (either
2765 returning a partial result or EINTR), properly invokes the signal
2766 handler and retries if the latter returned successfully."""
2767 select = support.import_module("select")
2768 # A quantity that exceeds the buffer size of an anonymous pipe's
2769 # write end.
2770 N = 1024 * 1024
2771 r, w = os.pipe()
2772 fdopen_kwargs["closefd"] = False
2773 # We need a separate thread to read from the pipe and allow the
2774 # write() to finish. This thread is started after the SIGALRM is
2775 # received (forcing a first EINTR in write()).
2776 read_results = []
2777 write_finished = False
2778 def _read():
2779 while not write_finished:
2780 while r in select.select([r], [], [], 1.0)[0]:
2781 s = os.read(r, 1024)
2782 read_results.append(s)
2783 t = threading.Thread(target=_read)
2784 t.daemon = True
2785 def alarm1(sig, frame):
2786 signal.signal(signal.SIGALRM, alarm2)
2787 signal.alarm(1)
2788 def alarm2(sig, frame):
2789 t.start()
2790 signal.signal(signal.SIGALRM, alarm1)
2791 try:
2792 wio = self.io.open(w, **fdopen_kwargs)
2793 signal.alarm(1)
2794 # Expected behaviour:
2795 # - first raw write() is partial (because of the limited pipe buffer
2796 # and the first alarm)
2797 # - second raw write() returns EINTR (because of the second alarm)
2798 # - subsequent write()s are successful (either partial or complete)
2799 self.assertEqual(N, wio.write(item * N))
2800 wio.flush()
2801 write_finished = True
2802 t.join()
2803 self.assertEqual(N, sum(len(x) for x in read_results))
2804 finally:
2805 write_finished = True
2806 os.close(w)
2807 os.close(r)
2808 # This is deliberate. If we didn't close the file descriptor
2809 # before closing wio, wio would try to flush its internal
2810 # buffer, and could block (in case of failure).
2811 try:
2812 wio.close()
2813 except IOError as e:
2814 if e.errno != errno.EBADF:
2815 raise
2816
2817 def test_interrupterd_write_retry_buffered(self):
2818 self.check_interrupted_write_retry(b"x", mode="wb")
2819
2820 def test_interrupterd_write_retry_text(self):
2821 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2822
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002823
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002824class CSignalsTest(SignalsTest):
2825 io = io
2826
2827class PySignalsTest(SignalsTest):
2828 io = pyio
2829
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002830 # Handling reentrancy issues would slow down _pyio even more, so the
2831 # tests are disabled.
2832 test_reentrant_write_buffered = None
2833 test_reentrant_write_text = None
2834
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002835
Guido van Rossum28524c72007-02-27 05:47:44 +00002836def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002837 tests = (CIOTest, PyIOTest,
2838 CBufferedReaderTest, PyBufferedReaderTest,
2839 CBufferedWriterTest, PyBufferedWriterTest,
2840 CBufferedRWPairTest, PyBufferedRWPairTest,
2841 CBufferedRandomTest, PyBufferedRandomTest,
2842 StatefulIncrementalDecoderTest,
2843 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2844 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002845 CMiscIOTest, PyMiscIOTest,
2846 CSignalsTest, PySignalsTest,
2847 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848
2849 # Put the namespaces of the IO module we are testing and some useful mock
2850 # classes in the __dict__ of each test.
2851 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002852 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002853 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2854 c_io_ns = {name : getattr(io, name) for name in all_members}
2855 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2856 globs = globals()
2857 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2858 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2859 # Avoid turning open into a bound method.
2860 py_io_ns["open"] = pyio.OpenWrapper
2861 for test in tests:
2862 if test.__name__.startswith("C"):
2863 for name, obj in c_io_ns.items():
2864 setattr(test, name, obj)
2865 elif test.__name__.startswith("Py"):
2866 for name, obj in py_io_ns.items():
2867 setattr(test, name, obj)
2868
2869 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002870
2871if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002872 test_main()