blob: 707b7cb0885485df3f90d66e943f61313d72eb50 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Antoine Pitroub46b9d52010-08-21 19:09:32 +000030import signal
31import errno
Antoine Pitroue033e062010-10-29 10:38:18 +000032import warnings
Antoine Pitrou243757e2010-11-05 21:15:39 +000033import pickle
Georg Brandl1b37e872010-03-14 10:45:50 +000034from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000037
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000038import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000039import io # C implementation of io
40import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000041try:
42 import threading
43except ImportError:
44 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000045
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047def _default_chunk_size():
48 """Get the default TextIOWrapper chunk size"""
49 with open(__file__, "r", encoding="latin1") as f:
50 return f._CHUNK_SIZE
51
52
Antoine Pitrou328ec742010-09-14 18:37:24 +000053class MockRawIOWithoutRead:
54 """A RawIO implementation without read(), so as to exercise the default
55 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000056
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000057 def __init__(self, read_stack=()):
58 self._read_stack = list(read_stack)
59 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000060 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000061 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000062
Guido van Rossum01a27522007-03-07 01:00:12 +000063 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000064 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000065 return len(b)
66
67 def writable(self):
68 return True
69
Guido van Rossum68bbcd22007-02-27 17:19:33 +000070 def fileno(self):
71 return 42
72
73 def readable(self):
74 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077 return True
78
Guido van Rossum01a27522007-03-07 01:00:12 +000079 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000081
82 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000083 return 0 # same comment as above
84
85 def readinto(self, buf):
86 self._reads += 1
87 max_len = len(buf)
88 try:
89 data = self._read_stack[0]
90 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 return 0
93 if data is None:
94 del self._read_stack[0]
95 return None
96 n = len(data)
97 if len(data) <= max_len:
98 del self._read_stack[0]
99 buf[:n] = data
100 return n
101 else:
102 buf[:] = data[:max_len]
103 self._read_stack[0] = data[max_len:]
104 return max_len
105
106 def truncate(self, pos=None):
107 return pos
108
Antoine Pitrou328ec742010-09-14 18:37:24 +0000109class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
110 pass
111
112class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
113 pass
114
115
116class MockRawIO(MockRawIOWithoutRead):
117
118 def read(self, n=None):
119 self._reads += 1
120 try:
121 return self._read_stack.pop(0)
122 except:
123 self._extraneous_reads += 1
124 return b""
125
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126class CMockRawIO(MockRawIO, io.RawIOBase):
127 pass
128
129class PyMockRawIO(MockRawIO, pyio.RawIOBase):
130 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000133class MisbehavedRawIO(MockRawIO):
134 def write(self, b):
135 return super().write(b) * 2
136
137 def read(self, n=None):
138 return super().read(n) * 2
139
140 def seek(self, pos, whence):
141 return -123
142
143 def tell(self):
144 return -456
145
146 def readinto(self, buf):
147 super().readinto(buf)
148 return len(buf) * 5
149
150class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
151 pass
152
153class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
154 pass
155
156
157class CloseFailureIO(MockRawIO):
158 closed = 0
159
160 def close(self):
161 if not self.closed:
162 self.closed = 1
163 raise IOError
164
165class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
166 pass
167
168class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
169 pass
170
171
172class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000173
174 def __init__(self, data):
175 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000177
178 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000179 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000180 self.read_history.append(None if res is None else len(res))
181 return res
182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000183 def readinto(self, b):
184 res = super().readinto(b)
185 self.read_history.append(res)
186 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000188class CMockFileIO(MockFileIO, io.BytesIO):
189 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000191class PyMockFileIO(MockFileIO, pyio.BytesIO):
192 pass
193
194
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000195class MockUnseekableIO:
196 def seekable(self):
197 return False
198
199 def seek(self, *args):
200 raise self.UnsupportedOperation("not seekable")
201
202 def tell(self, *args):
203 raise self.UnsupportedOperation("not seekable")
204
205class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
206 UnsupportedOperation = io.UnsupportedOperation
207
208class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
209 UnsupportedOperation = pyio.UnsupportedOperation
210
211
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000212class MockNonBlockWriterIO:
213
214 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000216 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000217
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218 def pop_written(self):
219 s = b"".join(self._write_stack)
220 self._write_stack[:] = []
221 return s
222
223 def block_on(self, char):
224 """Block when a given char is encountered."""
225 self._blocker_char = char
226
227 def readable(self):
228 return True
229
230 def seekable(self):
231 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000232
Guido van Rossum01a27522007-03-07 01:00:12 +0000233 def writable(self):
234 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000236 def write(self, b):
237 b = bytes(b)
238 n = -1
239 if self._blocker_char:
240 try:
241 n = b.index(self._blocker_char)
242 except ValueError:
243 pass
244 else:
245 self._blocker_char = None
246 self._write_stack.append(b[:n])
247 raise self.BlockingIOError(0, "test blocking", n)
248 self._write_stack.append(b)
249 return len(b)
250
251class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
252 BlockingIOError = io.BlockingIOError
253
254class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
255 BlockingIOError = pyio.BlockingIOError
256
Guido van Rossuma9e20242007-03-08 00:43:48 +0000257
Guido van Rossum28524c72007-02-27 05:47:44 +0000258class IOTest(unittest.TestCase):
259
Neal Norwitze7789b12008-03-24 06:18:09 +0000260 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000261 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000262
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000263 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000265
Guido van Rossum28524c72007-02-27 05:47:44 +0000266 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000267 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000268 f.truncate(0)
269 self.assertEqual(f.tell(), 5)
270 f.seek(0)
271
272 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000273 self.assertEqual(f.seek(0), 0)
274 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000275 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000276 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000277 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000278 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000279 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000280 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000281 self.assertEqual(f.seek(-1, 2), 13)
282 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000283
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000285 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000286 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000287
Guido van Rossum9b76da62007-04-11 01:09:03 +0000288 def read_ops(self, f, buffered=False):
289 data = f.read(5)
290 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000291 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000292 self.assertEqual(f.readinto(data), 5)
293 self.assertEqual(data, b" worl")
294 self.assertEqual(f.readinto(data), 2)
295 self.assertEqual(len(data), 5)
296 self.assertEqual(data[:2], b"d\n")
297 self.assertEqual(f.seek(0), 0)
298 self.assertEqual(f.read(20), b"hello world\n")
299 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000300 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000301 self.assertEqual(f.seek(-6, 2), 6)
302 self.assertEqual(f.read(5), b"world")
303 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000304 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000305 self.assertEqual(f.seek(-6, 1), 5)
306 self.assertEqual(f.read(5), b" worl")
307 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000308 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000309 if buffered:
310 f.seek(0)
311 self.assertEqual(f.read(), b"hello world\n")
312 f.seek(6)
313 self.assertEqual(f.read(), b"world\n")
314 self.assertEqual(f.read(), b"")
315
Guido van Rossum34d69e52007-04-10 20:08:41 +0000316 LARGE = 2**31
317
Guido van Rossum53807da2007-04-10 19:01:47 +0000318 def large_file_ops(self, f):
319 assert f.readable()
320 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000321 self.assertEqual(f.seek(self.LARGE), self.LARGE)
322 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000324 self.assertEqual(f.tell(), self.LARGE + 3)
325 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000327 self.assertEqual(f.tell(), self.LARGE + 2)
328 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000331 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
332 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 self.assertEqual(f.read(2), b"x")
334
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000335 def test_invalid_operations(self):
336 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000337 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000338 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000339 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000340 self.assertRaises(exc, fp.read)
341 self.assertRaises(exc, fp.readline)
342 with self.open(support.TESTFN, "wb", buffering=0) as fp:
343 self.assertRaises(exc, fp.read)
344 self.assertRaises(exc, fp.readline)
345 with self.open(support.TESTFN, "rb", buffering=0) as fp:
346 self.assertRaises(exc, fp.write, b"blah")
347 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000348 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000349 self.assertRaises(exc, fp.write, b"blah")
350 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000351 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000352 self.assertRaises(exc, fp.write, "blah")
353 self.assertRaises(exc, fp.writelines, ["blah\n"])
354 # Non-zero seeking from current or end pos
355 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
356 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000357
Guido van Rossum28524c72007-02-27 05:47:44 +0000358 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000359 with self.open(support.TESTFN, "wb", buffering=0) as f:
360 self.assertEqual(f.readable(), False)
361 self.assertEqual(f.writable(), True)
362 self.assertEqual(f.seekable(), True)
363 self.write_ops(f)
364 with self.open(support.TESTFN, "rb", buffering=0) as f:
365 self.assertEqual(f.readable(), True)
366 self.assertEqual(f.writable(), False)
367 self.assertEqual(f.seekable(), True)
368 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000369
Guido van Rossum87429772007-04-10 21:06:59 +0000370 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000371 with self.open(support.TESTFN, "wb") as f:
372 self.assertEqual(f.readable(), False)
373 self.assertEqual(f.writable(), True)
374 self.assertEqual(f.seekable(), True)
375 self.write_ops(f)
376 with self.open(support.TESTFN, "rb") as f:
377 self.assertEqual(f.readable(), True)
378 self.assertEqual(f.writable(), False)
379 self.assertEqual(f.seekable(), True)
380 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000381
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000382 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000383 with self.open(support.TESTFN, "wb") as f:
384 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
385 with self.open(support.TESTFN, "rb") as f:
386 self.assertEqual(f.readline(), b"abc\n")
387 self.assertEqual(f.readline(10), b"def\n")
388 self.assertEqual(f.readline(2), b"xy")
389 self.assertEqual(f.readline(4), b"zzy\n")
390 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000391 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000392 self.assertRaises(TypeError, f.readline, 5.3)
393 with self.open(support.TESTFN, "r") as f:
394 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000395
Guido van Rossum28524c72007-02-27 05:47:44 +0000396 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000398 self.write_ops(f)
399 data = f.getvalue()
400 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000402 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000403
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000405 # On Windows and Mac OSX this test comsumes large resources; It takes
406 # a long time to build the >2GB file and takes >2GB of disk space
407 # therefore the resource must be enabled to run this test.
408 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000410 print("\nTesting large file ops skipped on %s." % sys.platform,
411 file=sys.stderr)
412 print("It requires %d bytes and a long time." % self.LARGE,
413 file=sys.stderr)
414 print("Use 'regrtest.py -u largefile test_io' to run it.",
415 file=sys.stderr)
416 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 with self.open(support.TESTFN, "w+b", 0) as f:
418 self.large_file_ops(f)
419 with self.open(support.TESTFN, "w+b") as f:
420 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000421
422 def test_with_open(self):
423 for bufsize in (0, 1, 100):
424 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000425 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000426 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000427 self.assertEqual(f.closed, True)
428 f = None
429 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000430 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000431 1/0
432 except ZeroDivisionError:
433 self.assertEqual(f.closed, True)
434 else:
435 self.fail("1/0 didn't raise an exception")
436
Antoine Pitrou08838b62009-01-21 00:55:13 +0000437 # issue 5008
438 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000440 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000441 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000442 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000443 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000444 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000445 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000446 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000447
Guido van Rossum87429772007-04-10 21:06:59 +0000448 def test_destructor(self):
449 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000450 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def __del__(self):
452 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000453 try:
454 f = super().__del__
455 except AttributeError:
456 pass
457 else:
458 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 def close(self):
460 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000462 def flush(self):
463 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000464 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000465 with support.check_warnings(('', ResourceWarning)):
466 f = MyFileIO(support.TESTFN, "wb")
467 f.write(b"xxx")
468 del f
469 support.gc_collect()
470 self.assertEqual(record, [1, 2, 3])
471 with self.open(support.TESTFN, "rb") as f:
472 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473
474 def _check_base_destructor(self, base):
475 record = []
476 class MyIO(base):
477 def __init__(self):
478 # This exercises the availability of attributes on object
479 # destruction.
480 # (in the C version, close() is called by the tp_dealloc
481 # function, not by __del__)
482 self.on_del = 1
483 self.on_close = 2
484 self.on_flush = 3
485 def __del__(self):
486 record.append(self.on_del)
487 try:
488 f = super().__del__
489 except AttributeError:
490 pass
491 else:
492 f()
493 def close(self):
494 record.append(self.on_close)
495 super().close()
496 def flush(self):
497 record.append(self.on_flush)
498 super().flush()
499 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000500 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000501 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000502 self.assertEqual(record, [1, 2, 3])
503
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 def test_IOBase_destructor(self):
505 self._check_base_destructor(self.IOBase)
506
507 def test_RawIOBase_destructor(self):
508 self._check_base_destructor(self.RawIOBase)
509
510 def test_BufferedIOBase_destructor(self):
511 self._check_base_destructor(self.BufferedIOBase)
512
513 def test_TextIOBase_destructor(self):
514 self._check_base_destructor(self.TextIOBase)
515
Guido van Rossum87429772007-04-10 21:06:59 +0000516 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000517 with self.open(support.TESTFN, "wb") as f:
518 f.write(b"xxx")
519 with self.open(support.TESTFN, "rb") as f:
520 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000521
Guido van Rossumd4103952007-04-12 05:44:49 +0000522 def test_array_writes(self):
523 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000524 n = len(a.tobytes())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000525 with self.open(support.TESTFN, "wb", 0) as f:
526 self.assertEqual(f.write(a), n)
527 with self.open(support.TESTFN, "wb") as f:
528 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000529
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000530 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000532 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000533
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000534 def test_read_closed(self):
535 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000536 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000537 with self.open(support.TESTFN, "r") as f:
538 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000539 self.assertEqual(file.read(), "egg\n")
540 file.seek(0)
541 file.close()
542 self.assertRaises(ValueError, file.read)
543
544 def test_no_closefd_with_filename(self):
545 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000546 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000547
548 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000549 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000550 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000551 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000552 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000553 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000554 self.assertEqual(file.buffer.raw.closefd, False)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556 def test_garbage_collection(self):
557 # FileIO objects are collected, and collecting them flushes
558 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000559 with support.check_warnings(('', ResourceWarning)):
560 f = self.FileIO(support.TESTFN, "wb")
561 f.write(b"abcxxx")
562 f.f = f
563 wr = weakref.ref(f)
564 del f
565 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000566 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000567 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000569
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000570 def test_unbounded_file(self):
571 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
572 zero = "/dev/zero"
573 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000574 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000575 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000576 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000577 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000578 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000579 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000580 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000581 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000582 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000583 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000584 self.assertRaises(OverflowError, f.read)
585
Antoine Pitrou6be88762010-05-03 16:48:20 +0000586 def test_flush_error_on_close(self):
587 f = self.open(support.TESTFN, "wb", buffering=0)
588 def bad_flush():
589 raise IOError()
590 f.flush = bad_flush
591 self.assertRaises(IOError, f.close) # exception not swallowed
592
593 def test_multi_close(self):
594 f = self.open(support.TESTFN, "wb", buffering=0)
595 f.close()
596 f.close()
597 f.close()
598 self.assertRaises(ValueError, f.flush)
599
Antoine Pitrou328ec742010-09-14 18:37:24 +0000600 def test_RawIOBase_read(self):
601 # Exercise the default RawIOBase.read() implementation (which calls
602 # readinto() internally).
603 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
604 self.assertEqual(rawio.read(2), b"ab")
605 self.assertEqual(rawio.read(2), b"c")
606 self.assertEqual(rawio.read(2), b"d")
607 self.assertEqual(rawio.read(2), None)
608 self.assertEqual(rawio.read(2), b"ef")
609 self.assertEqual(rawio.read(2), b"g")
610 self.assertEqual(rawio.read(2), None)
611 self.assertEqual(rawio.read(2), b"")
612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613class CIOTest(IOTest):
614 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000615
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616class PyIOTest(IOTest):
617 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000618
Guido van Rossuma9e20242007-03-08 00:43:48 +0000619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620class CommonBufferedTests:
621 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
622
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000623 def test_detach(self):
624 raw = self.MockRawIO()
625 buf = self.tp(raw)
626 self.assertIs(buf.detach(), raw)
627 self.assertRaises(ValueError, buf.detach)
628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 def test_fileno(self):
630 rawio = self.MockRawIO()
631 bufio = self.tp(rawio)
632
Ezio Melottib3aedd42010-11-20 19:04:17 +0000633 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634
635 def test_no_fileno(self):
636 # XXX will we always have fileno() function? If so, kill
637 # this test. Else, write it.
638 pass
639
640 def test_invalid_args(self):
641 rawio = self.MockRawIO()
642 bufio = self.tp(rawio)
643 # Invalid whence
644 self.assertRaises(ValueError, bufio.seek, 0, -1)
645 self.assertRaises(ValueError, bufio.seek, 0, 3)
646
647 def test_override_destructor(self):
648 tp = self.tp
649 record = []
650 class MyBufferedIO(tp):
651 def __del__(self):
652 record.append(1)
653 try:
654 f = super().__del__
655 except AttributeError:
656 pass
657 else:
658 f()
659 def close(self):
660 record.append(2)
661 super().close()
662 def flush(self):
663 record.append(3)
664 super().flush()
665 rawio = self.MockRawIO()
666 bufio = MyBufferedIO(rawio)
667 writable = bufio.writable()
668 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000669 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000670 if writable:
671 self.assertEqual(record, [1, 2, 3])
672 else:
673 self.assertEqual(record, [1, 2])
674
675 def test_context_manager(self):
676 # Test usability as a context manager
677 rawio = self.MockRawIO()
678 bufio = self.tp(rawio)
679 def _with():
680 with bufio:
681 pass
682 _with()
683 # bufio should now be closed, and using it a second time should raise
684 # a ValueError.
685 self.assertRaises(ValueError, _with)
686
687 def test_error_through_destructor(self):
688 # Test that the exception state is not modified by a destructor,
689 # even if close() fails.
690 rawio = self.CloseFailureIO()
691 def f():
692 self.tp(rawio).xyzzy
693 with support.captured_output("stderr") as s:
694 self.assertRaises(AttributeError, f)
695 s = s.getvalue().strip()
696 if s:
697 # The destructor *may* have printed an unraisable error, check it
698 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000699 self.assertTrue(s.startswith("Exception IOError: "), s)
700 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000701
Antoine Pitrou716c4442009-05-23 19:04:03 +0000702 def test_repr(self):
703 raw = self.MockRawIO()
704 b = self.tp(raw)
705 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
706 self.assertEqual(repr(b), "<%s>" % clsname)
707 raw.name = "dummy"
708 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
709 raw.name = b"dummy"
710 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
711
Antoine Pitrou6be88762010-05-03 16:48:20 +0000712 def test_flush_error_on_close(self):
713 raw = self.MockRawIO()
714 def bad_flush():
715 raise IOError()
716 raw.flush = bad_flush
717 b = self.tp(raw)
718 self.assertRaises(IOError, b.close) # exception not swallowed
719
720 def test_multi_close(self):
721 raw = self.MockRawIO()
722 b = self.tp(raw)
723 b.close()
724 b.close()
725 b.close()
726 self.assertRaises(ValueError, b.flush)
727
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000728 def test_unseekable(self):
729 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
730 self.assertRaises(self.UnsupportedOperation, bufio.tell)
731 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
732
Antoine Pitrou7f8f4182010-12-21 21:20:59 +0000733 def test_readonly_attributes(self):
734 raw = self.MockRawIO()
735 buf = self.tp(raw)
736 x = self.MockRawIO()
737 with self.assertRaises(AttributeError):
738 buf.raw = x
739
Guido van Rossum78892e42007-04-06 17:31:18 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
742 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 def test_constructor(self):
745 rawio = self.MockRawIO([b"abc"])
746 bufio = self.tp(rawio)
747 bufio.__init__(rawio)
748 bufio.__init__(rawio, buffer_size=1024)
749 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
752 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
753 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
754 rawio = self.MockRawIO([b"abc"])
755 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000759 for arg in (None, 7):
760 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
761 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000762 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 # Invalid args
764 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000766 def test_read1(self):
767 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
768 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000769 self.assertEqual(b"a", bufio.read(1))
770 self.assertEqual(b"b", bufio.read1(1))
771 self.assertEqual(rawio._reads, 1)
772 self.assertEqual(b"c", bufio.read1(100))
773 self.assertEqual(rawio._reads, 1)
774 self.assertEqual(b"d", bufio.read1(100))
775 self.assertEqual(rawio._reads, 2)
776 self.assertEqual(b"efg", bufio.read1(100))
777 self.assertEqual(rawio._reads, 3)
778 self.assertEqual(b"", bufio.read1(100))
779 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000780 # Invalid args
781 self.assertRaises(ValueError, bufio.read1, -1)
782
783 def test_readinto(self):
784 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
786 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000787 self.assertEqual(bufio.readinto(b), 2)
788 self.assertEqual(b, b"ab")
789 self.assertEqual(bufio.readinto(b), 2)
790 self.assertEqual(b, b"cd")
791 self.assertEqual(bufio.readinto(b), 2)
792 self.assertEqual(b, b"ef")
793 self.assertEqual(bufio.readinto(b), 1)
794 self.assertEqual(b, b"gf")
795 self.assertEqual(bufio.readinto(b), 0)
796 self.assertEqual(b, b"gf")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000797
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000798 def test_readlines(self):
799 def bufio():
800 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
801 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000802 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
803 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
804 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000805
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000806 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000807 data = b"abcdefghi"
808 dlen = len(data)
809
810 tests = [
811 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
812 [ 100, [ 3, 3, 3], [ dlen ] ],
813 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
814 ]
815
816 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 rawio = self.MockFileIO(data)
818 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000819 pos = 0
820 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000821 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000822 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000823 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +0000824 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000825
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000826 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000827 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000828 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
829 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000830 self.assertEqual(b"abcd", bufio.read(6))
831 self.assertEqual(b"e", bufio.read(1))
832 self.assertEqual(b"fg", bufio.read())
833 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +0200834 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +0000835 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000836
Victor Stinnera80987f2011-05-25 22:47:16 +0200837 rawio = self.MockRawIO((b"a", None, None))
838 self.assertEqual(b"a", rawio.readall())
839 self.assertIsNone(rawio.readall())
840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841 def test_read_past_eof(self):
842 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
843 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000844
Ezio Melottib3aedd42010-11-20 19:04:17 +0000845 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000847 def test_read_all(self):
848 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
849 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000850
Ezio Melottib3aedd42010-11-20 19:04:17 +0000851 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000852
Victor Stinner45df8202010-04-28 22:31:17 +0000853 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +0000854 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000855 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000856 try:
857 # Write out many bytes with exactly the same number of 0's,
858 # 1's... 255's. This will help us check that concurrent reading
859 # doesn't duplicate or forget contents.
860 N = 1000
861 l = list(range(256)) * N
862 random.shuffle(l)
863 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000864 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000865 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000866 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000868 errors = []
869 results = []
870 def f():
871 try:
872 # Intra-buffer read then buffer-flushing read
873 for n in cycle([1, 19]):
874 s = bufio.read(n)
875 if not s:
876 break
877 # list.append() is atomic
878 results.append(s)
879 except Exception as e:
880 errors.append(e)
881 raise
882 threads = [threading.Thread(target=f) for x in range(20)]
883 for t in threads:
884 t.start()
885 time.sleep(0.02) # yield
886 for t in threads:
887 t.join()
888 self.assertFalse(errors,
889 "the following exceptions were caught: %r" % errors)
890 s = b''.join(results)
891 for i in range(256):
892 c = bytes(bytearray([i]))
893 self.assertEqual(s.count(c), N)
894 finally:
895 support.unlink(support.TESTFN)
896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 def test_misbehaved_io(self):
898 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
899 bufio = self.tp(rawio)
900 self.assertRaises(IOError, bufio.seek, 0)
901 self.assertRaises(IOError, bufio.tell)
902
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000903 def test_no_extraneous_read(self):
904 # Issue #9550; when the raw IO object has satisfied the read request,
905 # we should not issue any additional reads, otherwise it may block
906 # (e.g. socket).
907 bufsize = 16
908 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
909 rawio = self.MockRawIO([b"x" * n])
910 bufio = self.tp(rawio, bufsize)
911 self.assertEqual(bufio.read(n), b"x" * n)
912 # Simple case: one raw read is enough to satisfy the request.
913 self.assertEqual(rawio._extraneous_reads, 0,
914 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
915 # A more complex case where two raw reads are needed to satisfy
916 # the request.
917 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
918 bufio = self.tp(rawio, bufsize)
919 self.assertEqual(bufio.read(n), b"x" * n)
920 self.assertEqual(rawio._extraneous_reads, 0,
921 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
922
923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924class CBufferedReaderTest(BufferedReaderTest):
925 tp = io.BufferedReader
926
927 def test_constructor(self):
928 BufferedReaderTest.test_constructor(self)
929 # The allocation can succeed on 32-bit builds, e.g. with more
930 # than 2GB RAM and a 64-bit kernel.
931 if sys.maxsize > 0x7FFFFFFF:
932 rawio = self.MockRawIO()
933 bufio = self.tp(rawio)
934 self.assertRaises((OverflowError, MemoryError, ValueError),
935 bufio.__init__, rawio, sys.maxsize)
936
937 def test_initialization(self):
938 rawio = self.MockRawIO([b"abc"])
939 bufio = self.tp(rawio)
940 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
941 self.assertRaises(ValueError, bufio.read)
942 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
943 self.assertRaises(ValueError, bufio.read)
944 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
945 self.assertRaises(ValueError, bufio.read)
946
947 def test_misbehaved_io_read(self):
948 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
949 bufio = self.tp(rawio)
950 # _pyio.BufferedReader seems to implement reading different, so that
951 # checking this is not so easy.
952 self.assertRaises(IOError, bufio.read, 10)
953
954 def test_garbage_collection(self):
955 # C BufferedReader objects are collected.
956 # The Python version has __del__, so it ends into gc.garbage instead
957 rawio = self.FileIO(support.TESTFN, "w+b")
958 f = self.tp(rawio)
959 f.f = f
960 wr = weakref.ref(f)
961 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000962 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000963 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964
965class PyBufferedReaderTest(BufferedReaderTest):
966 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000967
Guido van Rossuma9e20242007-03-08 00:43:48 +0000968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
970 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000972 def test_constructor(self):
973 rawio = self.MockRawIO()
974 bufio = self.tp(rawio)
975 bufio.__init__(rawio)
976 bufio.__init__(rawio, buffer_size=1024)
977 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000978 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979 bufio.flush()
980 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
981 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
983 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000984 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000986 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000987
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000988 def test_detach_flush(self):
989 raw = self.MockRawIO()
990 buf = self.tp(raw)
991 buf.write(b"howdy!")
992 self.assertFalse(raw._write_stack)
993 buf.detach()
994 self.assertEqual(raw._write_stack, [b"howdy!"])
995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000996 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000997 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998 writer = self.MockRawIO()
999 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001000 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001001 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001003 def test_write_overflow(self):
1004 writer = self.MockRawIO()
1005 bufio = self.tp(writer, 8)
1006 contents = b"abcdefghijklmnop"
1007 for n in range(0, len(contents), 3):
1008 bufio.write(contents[n:n+3])
1009 flushed = b"".join(writer._write_stack)
1010 # At least (total - 8) bytes were implicitly flushed, perhaps more
1011 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001012 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001013
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001014 def check_writes(self, intermediate_func):
1015 # Lots of writes, test the flushed output is as expected.
1016 contents = bytes(range(256)) * 1000
1017 n = 0
1018 writer = self.MockRawIO()
1019 bufio = self.tp(writer, 13)
1020 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1021 def gen_sizes():
1022 for size in count(1):
1023 for i in range(15):
1024 yield size
1025 sizes = gen_sizes()
1026 while n < len(contents):
1027 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001028 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029 intermediate_func(bufio)
1030 n += size
1031 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001032 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 def test_writes(self):
1035 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001036
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 def test_writes_and_flushes(self):
1038 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_writes_and_seeks(self):
1041 def _seekabs(bufio):
1042 pos = bufio.tell()
1043 bufio.seek(pos + 1, 0)
1044 bufio.seek(pos - 1, 0)
1045 bufio.seek(pos, 0)
1046 self.check_writes(_seekabs)
1047 def _seekrel(bufio):
1048 pos = bufio.seek(0, 1)
1049 bufio.seek(+1, 1)
1050 bufio.seek(-1, 1)
1051 bufio.seek(pos, 0)
1052 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_writes_and_truncates(self):
1055 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_write_non_blocking(self):
1058 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001059 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001060
Ezio Melottib3aedd42010-11-20 19:04:17 +00001061 self.assertEqual(bufio.write(b"abcd"), 4)
1062 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 # 1 byte will be written, the rest will be buffered
1064 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001065 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001066
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001067 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1068 raw.block_on(b"0")
1069 try:
1070 bufio.write(b"opqrwxyz0123456789")
1071 except self.BlockingIOError as e:
1072 written = e.characters_written
1073 else:
1074 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001075 self.assertEqual(written, 16)
1076 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001077 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001078
Ezio Melottib3aedd42010-11-20 19:04:17 +00001079 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 s = raw.pop_written()
1081 # Previously buffered bytes were flushed
1082 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001084 def test_write_and_rewind(self):
1085 raw = io.BytesIO()
1086 bufio = self.tp(raw, 4)
1087 self.assertEqual(bufio.write(b"abcdef"), 6)
1088 self.assertEqual(bufio.tell(), 6)
1089 bufio.seek(0, 0)
1090 self.assertEqual(bufio.write(b"XY"), 2)
1091 bufio.seek(6, 0)
1092 self.assertEqual(raw.getvalue(), b"XYcdef")
1093 self.assertEqual(bufio.write(b"123456"), 6)
1094 bufio.flush()
1095 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001096
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001097 def test_flush(self):
1098 writer = self.MockRawIO()
1099 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001100 bufio.write(b"abc")
1101 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001102 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001103
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001104 def test_destructor(self):
1105 writer = self.MockRawIO()
1106 bufio = self.tp(writer, 8)
1107 bufio.write(b"abc")
1108 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001109 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001110 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001111
1112 def test_truncate(self):
1113 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001114 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001115 bufio = self.tp(raw, 8)
1116 bufio.write(b"abcdef")
1117 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001118 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001119 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001120 self.assertEqual(f.read(), b"abc")
1121
Victor Stinner45df8202010-04-28 22:31:17 +00001122 @unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001123 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001124 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001125 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 # Write out many bytes from many threads and test they were
1127 # all flushed.
1128 N = 1000
1129 contents = bytes(range(256)) * N
1130 sizes = cycle([1, 19])
1131 n = 0
1132 queue = deque()
1133 while n < len(contents):
1134 size = next(sizes)
1135 queue.append(contents[n:n+size])
1136 n += size
1137 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001138 # We use a real file object because it allows us to
1139 # exercise situations where the GIL is released before
1140 # writing the buffer to the raw streams. This is in addition
1141 # to concurrency issues due to switching threads in the middle
1142 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001143 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001144 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001145 errors = []
1146 def f():
1147 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 while True:
1149 try:
1150 s = queue.popleft()
1151 except IndexError:
1152 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001153 bufio.write(s)
1154 except Exception as e:
1155 errors.append(e)
1156 raise
1157 threads = [threading.Thread(target=f) for x in range(20)]
1158 for t in threads:
1159 t.start()
1160 time.sleep(0.02) # yield
1161 for t in threads:
1162 t.join()
1163 self.assertFalse(errors,
1164 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001165 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001166 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001167 s = f.read()
1168 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001169 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001170 finally:
1171 support.unlink(support.TESTFN)
1172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001173 def test_misbehaved_io(self):
1174 rawio = self.MisbehavedRawIO()
1175 bufio = self.tp(rawio, 5)
1176 self.assertRaises(IOError, bufio.seek, 0)
1177 self.assertRaises(IOError, bufio.tell)
1178 self.assertRaises(IOError, bufio.write, b"abcdef")
1179
Benjamin Peterson59406a92009-03-26 17:10:29 +00001180 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001181 with support.check_warnings(("max_buffer_size is deprecated",
1182 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001183 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001184
1185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001186class CBufferedWriterTest(BufferedWriterTest):
1187 tp = io.BufferedWriter
1188
1189 def test_constructor(self):
1190 BufferedWriterTest.test_constructor(self)
1191 # The allocation can succeed on 32-bit builds, e.g. with more
1192 # than 2GB RAM and a 64-bit kernel.
1193 if sys.maxsize > 0x7FFFFFFF:
1194 rawio = self.MockRawIO()
1195 bufio = self.tp(rawio)
1196 self.assertRaises((OverflowError, MemoryError, ValueError),
1197 bufio.__init__, rawio, sys.maxsize)
1198
1199 def test_initialization(self):
1200 rawio = self.MockRawIO()
1201 bufio = self.tp(rawio)
1202 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1203 self.assertRaises(ValueError, bufio.write, b"def")
1204 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1205 self.assertRaises(ValueError, bufio.write, b"def")
1206 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1207 self.assertRaises(ValueError, bufio.write, b"def")
1208
1209 def test_garbage_collection(self):
1210 # C BufferedWriter objects are collected, and collecting them flushes
1211 # all data to disk.
1212 # The Python version has __del__, so it ends into gc.garbage instead
1213 rawio = self.FileIO(support.TESTFN, "w+b")
1214 f = self.tp(rawio)
1215 f.write(b"123xxx")
1216 f.x = f
1217 wr = weakref.ref(f)
1218 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001219 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001220 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001221 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001222 self.assertEqual(f.read(), b"123xxx")
1223
1224
1225class PyBufferedWriterTest(BufferedWriterTest):
1226 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001227
Guido van Rossum01a27522007-03-07 01:00:12 +00001228class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001229
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001230 def test_constructor(self):
1231 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001232 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001233
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001234 def test_detach(self):
1235 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1236 self.assertRaises(self.UnsupportedOperation, pair.detach)
1237
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001238 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001239 with support.check_warnings(("max_buffer_size is deprecated",
1240 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001241 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001242
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001243 def test_constructor_with_not_readable(self):
1244 class NotReadable(MockRawIO):
1245 def readable(self):
1246 return False
1247
1248 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1249
1250 def test_constructor_with_not_writeable(self):
1251 class NotWriteable(MockRawIO):
1252 def writable(self):
1253 return False
1254
1255 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1256
1257 def test_read(self):
1258 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1259
1260 self.assertEqual(pair.read(3), b"abc")
1261 self.assertEqual(pair.read(1), b"d")
1262 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001263 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1264 self.assertEqual(pair.read(None), b"abc")
1265
1266 def test_readlines(self):
1267 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1268 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1269 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1270 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001271
1272 def test_read1(self):
1273 # .read1() is delegated to the underlying reader object, so this test
1274 # can be shallow.
1275 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1276
1277 self.assertEqual(pair.read1(3), b"abc")
1278
1279 def test_readinto(self):
1280 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1281
1282 data = bytearray(5)
1283 self.assertEqual(pair.readinto(data), 5)
1284 self.assertEqual(data, b"abcde")
1285
1286 def test_write(self):
1287 w = self.MockRawIO()
1288 pair = self.tp(self.MockRawIO(), w)
1289
1290 pair.write(b"abc")
1291 pair.flush()
1292 pair.write(b"def")
1293 pair.flush()
1294 self.assertEqual(w._write_stack, [b"abc", b"def"])
1295
1296 def test_peek(self):
1297 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1298
1299 self.assertTrue(pair.peek(3).startswith(b"abc"))
1300 self.assertEqual(pair.read(3), b"abc")
1301
1302 def test_readable(self):
1303 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1304 self.assertTrue(pair.readable())
1305
1306 def test_writeable(self):
1307 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1308 self.assertTrue(pair.writable())
1309
1310 def test_seekable(self):
1311 # BufferedRWPairs are never seekable, even if their readers and writers
1312 # are.
1313 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1314 self.assertFalse(pair.seekable())
1315
1316 # .flush() is delegated to the underlying writer object and has been
1317 # tested in the test_write method.
1318
1319 def test_close_and_closed(self):
1320 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1321 self.assertFalse(pair.closed)
1322 pair.close()
1323 self.assertTrue(pair.closed)
1324
1325 def test_isatty(self):
1326 class SelectableIsAtty(MockRawIO):
1327 def __init__(self, isatty):
1328 MockRawIO.__init__(self)
1329 self._isatty = isatty
1330
1331 def isatty(self):
1332 return self._isatty
1333
1334 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1335 self.assertFalse(pair.isatty())
1336
1337 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1338 self.assertTrue(pair.isatty())
1339
1340 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1341 self.assertTrue(pair.isatty())
1342
1343 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1344 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001345
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001346class CBufferedRWPairTest(BufferedRWPairTest):
1347 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001348
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001349class PyBufferedRWPairTest(BufferedRWPairTest):
1350 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001351
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001352
1353class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1354 read_mode = "rb+"
1355 write_mode = "wb+"
1356
1357 def test_constructor(self):
1358 BufferedReaderTest.test_constructor(self)
1359 BufferedWriterTest.test_constructor(self)
1360
1361 def test_read_and_write(self):
1362 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001363 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001364
1365 self.assertEqual(b"as", rw.read(2))
1366 rw.write(b"ddd")
1367 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001368 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001370 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001371
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001372 def test_seek_and_tell(self):
1373 raw = self.BytesIO(b"asdfghjkl")
1374 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001375
Ezio Melottib3aedd42010-11-20 19:04:17 +00001376 self.assertEqual(b"as", rw.read(2))
1377 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001378 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001379 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00001380
1381 rw.write(b"asdf")
1382 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001383 self.assertEqual(b"asdfasdfl", rw.read())
1384 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001385 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001386 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00001387 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001388 self.assertEqual(7, rw.tell())
1389 self.assertEqual(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001390 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 def check_flush_and_read(self, read_func):
1393 raw = self.BytesIO(b"abcdefghi")
1394 bufio = self.tp(raw)
1395
Ezio Melottib3aedd42010-11-20 19:04:17 +00001396 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(b"ef", read_func(bufio, 2))
1399 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(6, bufio.tell())
1402 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 raw.seek(0, 0)
1404 raw.write(b"XYZ")
1405 # flush() resets the read buffer
1406 bufio.flush()
1407 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001408 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409
1410 def test_flush_and_read(self):
1411 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1412
1413 def test_flush_and_readinto(self):
1414 def _readinto(bufio, n=-1):
1415 b = bytearray(n if n >= 0 else 9999)
1416 n = bufio.readinto(b)
1417 return bytes(b[:n])
1418 self.check_flush_and_read(_readinto)
1419
1420 def test_flush_and_peek(self):
1421 def _peek(bufio, n=-1):
1422 # This relies on the fact that the buffer can contain the whole
1423 # raw stream, otherwise peek() can return less.
1424 b = bufio.peek(n)
1425 if n != -1:
1426 b = b[:n]
1427 bufio.seek(len(b), 1)
1428 return b
1429 self.check_flush_and_read(_peek)
1430
1431 def test_flush_and_write(self):
1432 raw = self.BytesIO(b"abcdefghi")
1433 bufio = self.tp(raw)
1434
1435 bufio.write(b"123")
1436 bufio.flush()
1437 bufio.write(b"45")
1438 bufio.flush()
1439 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001440 self.assertEqual(b"12345fghi", raw.getvalue())
1441 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001442
1443 def test_threads(self):
1444 BufferedReaderTest.test_threads(self)
1445 BufferedWriterTest.test_threads(self)
1446
1447 def test_writes_and_peek(self):
1448 def _peek(bufio):
1449 bufio.peek(1)
1450 self.check_writes(_peek)
1451 def _peek(bufio):
1452 pos = bufio.tell()
1453 bufio.seek(-1, 1)
1454 bufio.peek(1)
1455 bufio.seek(pos, 0)
1456 self.check_writes(_peek)
1457
1458 def test_writes_and_reads(self):
1459 def _read(bufio):
1460 bufio.seek(-1, 1)
1461 bufio.read(1)
1462 self.check_writes(_read)
1463
1464 def test_writes_and_read1s(self):
1465 def _read1(bufio):
1466 bufio.seek(-1, 1)
1467 bufio.read1(1)
1468 self.check_writes(_read1)
1469
1470 def test_writes_and_readintos(self):
1471 def _read(bufio):
1472 bufio.seek(-1, 1)
1473 bufio.readinto(bytearray(1))
1474 self.check_writes(_read)
1475
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001476 def test_write_after_readahead(self):
1477 # Issue #6629: writing after the buffer was filled by readahead should
1478 # first rewind the raw stream.
1479 for overwrite_size in [1, 5]:
1480 raw = self.BytesIO(b"A" * 10)
1481 bufio = self.tp(raw, 4)
1482 # Trigger readahead
1483 self.assertEqual(bufio.read(1), b"A")
1484 self.assertEqual(bufio.tell(), 1)
1485 # Overwriting should rewind the raw stream if it needs so
1486 bufio.write(b"B" * overwrite_size)
1487 self.assertEqual(bufio.tell(), overwrite_size + 1)
1488 # If the write size was smaller than the buffer size, flush() and
1489 # check that rewind happens.
1490 bufio.flush()
1491 self.assertEqual(bufio.tell(), overwrite_size + 1)
1492 s = raw.getvalue()
1493 self.assertEqual(s,
1494 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1495
Antoine Pitrou7c404892011-05-13 00:13:33 +02001496 def test_write_rewind_write(self):
1497 # Various combinations of reading / writing / seeking backwards / writing again
1498 def mutate(bufio, pos1, pos2):
1499 assert pos2 >= pos1
1500 # Fill the buffer
1501 bufio.seek(pos1)
1502 bufio.read(pos2 - pos1)
1503 bufio.write(b'\x02')
1504 # This writes earlier than the previous write, but still inside
1505 # the buffer.
1506 bufio.seek(pos1)
1507 bufio.write(b'\x01')
1508
1509 b = b"\x80\x81\x82\x83\x84"
1510 for i in range(0, len(b)):
1511 for j in range(i, len(b)):
1512 raw = self.BytesIO(b)
1513 bufio = self.tp(raw, 100)
1514 mutate(bufio, i, j)
1515 bufio.flush()
1516 expected = bytearray(b)
1517 expected[j] = 2
1518 expected[i] = 1
1519 self.assertEqual(raw.getvalue(), expected,
1520 "failed result for i=%d, j=%d" % (i, j))
1521
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001522 def test_truncate_after_read_or_write(self):
1523 raw = self.BytesIO(b"A" * 10)
1524 bufio = self.tp(raw, 100)
1525 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1526 self.assertEqual(bufio.truncate(), 2)
1527 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1528 self.assertEqual(bufio.truncate(), 4)
1529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_misbehaved_io(self):
1531 BufferedReaderTest.test_misbehaved_io(self)
1532 BufferedWriterTest.test_misbehaved_io(self)
1533
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001534 # You can't construct a BufferedRandom over a non-seekable stream.
1535 test_unseekable = None
1536
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001537class CBufferedRandomTest(BufferedRandomTest):
1538 tp = io.BufferedRandom
1539
1540 def test_constructor(self):
1541 BufferedRandomTest.test_constructor(self)
1542 # The allocation can succeed on 32-bit builds, e.g. with more
1543 # than 2GB RAM and a 64-bit kernel.
1544 if sys.maxsize > 0x7FFFFFFF:
1545 rawio = self.MockRawIO()
1546 bufio = self.tp(rawio)
1547 self.assertRaises((OverflowError, MemoryError, ValueError),
1548 bufio.__init__, rawio, sys.maxsize)
1549
1550 def test_garbage_collection(self):
1551 CBufferedReaderTest.test_garbage_collection(self)
1552 CBufferedWriterTest.test_garbage_collection(self)
1553
1554class PyBufferedRandomTest(BufferedRandomTest):
1555 tp = pyio.BufferedRandom
1556
1557
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001558# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1559# properties:
1560# - A single output character can correspond to many bytes of input.
1561# - The number of input bytes to complete the character can be
1562# undetermined until the last input byte is received.
1563# - The number of input bytes can vary depending on previous input.
1564# - A single input byte can correspond to many characters of output.
1565# - The number of output characters can be undetermined until the
1566# last input byte is received.
1567# - The number of output characters can vary depending on previous input.
1568
1569class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1570 """
1571 For testing seek/tell behavior with a stateful, buffering decoder.
1572
1573 Input is a sequence of words. Words may be fixed-length (length set
1574 by input) or variable-length (period-terminated). In variable-length
1575 mode, extra periods are ignored. Possible words are:
1576 - 'i' followed by a number sets the input length, I (maximum 99).
1577 When I is set to 0, words are space-terminated.
1578 - 'o' followed by a number sets the output length, O (maximum 99).
1579 - Any other word is converted into a word followed by a period on
1580 the output. The output word consists of the input word truncated
1581 or padded out with hyphens to make its length equal to O. If O
1582 is 0, the word is output verbatim without truncating or padding.
1583 I and O are initially set to 1. When I changes, any buffered input is
1584 re-scanned according to the new I. EOF also terminates the last word.
1585 """
1586
1587 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001588 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001589 self.reset()
1590
1591 def __repr__(self):
1592 return '<SID %x>' % id(self)
1593
1594 def reset(self):
1595 self.i = 1
1596 self.o = 1
1597 self.buffer = bytearray()
1598
1599 def getstate(self):
1600 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1601 return bytes(self.buffer), i*100 + o
1602
1603 def setstate(self, state):
1604 buffer, io = state
1605 self.buffer = bytearray(buffer)
1606 i, o = divmod(io, 100)
1607 self.i, self.o = i ^ 1, o ^ 1
1608
1609 def decode(self, input, final=False):
1610 output = ''
1611 for b in input:
1612 if self.i == 0: # variable-length, terminated with period
1613 if b == ord('.'):
1614 if self.buffer:
1615 output += self.process_word()
1616 else:
1617 self.buffer.append(b)
1618 else: # fixed-length, terminate after self.i bytes
1619 self.buffer.append(b)
1620 if len(self.buffer) == self.i:
1621 output += self.process_word()
1622 if final and self.buffer: # EOF terminates the last word
1623 output += self.process_word()
1624 return output
1625
1626 def process_word(self):
1627 output = ''
1628 if self.buffer[0] == ord('i'):
1629 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1630 elif self.buffer[0] == ord('o'):
1631 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1632 else:
1633 output = self.buffer.decode('ascii')
1634 if len(output) < self.o:
1635 output += '-'*self.o # pad out with hyphens
1636 if self.o:
1637 output = output[:self.o] # truncate to output length
1638 output += '.'
1639 self.buffer = bytearray()
1640 return output
1641
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001642 codecEnabled = False
1643
1644 @classmethod
1645 def lookupTestDecoder(cls, name):
1646 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001647 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001648 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001649 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001650 incrementalencoder=None,
1651 streamreader=None, streamwriter=None,
1652 incrementaldecoder=cls)
1653
1654# Register the previous decoder for testing.
1655# Disabled by default, tests will enable it.
1656codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1657
1658
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001659class StatefulIncrementalDecoderTest(unittest.TestCase):
1660 """
1661 Make sure the StatefulIncrementalDecoder actually works.
1662 """
1663
1664 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001665 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001666 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001667 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001668 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001669 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001670 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001671 # I=0, O=6 (variable-length input, fixed-length output)
1672 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1673 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001674 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001675 # I=6, O=3 (fixed-length input > fixed-length output)
1676 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1677 # I=0, then 3; O=29, then 15 (with longer output)
1678 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1679 'a----------------------------.' +
1680 'b----------------------------.' +
1681 'cde--------------------------.' +
1682 'abcdefghijabcde.' +
1683 'a.b------------.' +
1684 '.c.------------.' +
1685 'd.e------------.' +
1686 'k--------------.' +
1687 'l--------------.' +
1688 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001689 ]
1690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001692 # Try a few one-shot test cases.
1693 for input, eof, output in self.test_cases:
1694 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001695 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001696
1697 # Also test an unfinished decode, followed by forcing EOF.
1698 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001699 self.assertEqual(d.decode(b'oiabcd'), '')
1700 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001701
1702class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001703
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001704 def setUp(self):
1705 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1706 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001707 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001708
Guido van Rossumd0712812007-04-11 16:32:43 +00001709 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001710 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_constructor(self):
1713 r = self.BytesIO(b"\xc3\xa9\n\n")
1714 b = self.BufferedReader(r, 1000)
1715 t = self.TextIOWrapper(b)
1716 t.__init__(b, encoding="latin1", newline="\r\n")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001717 self.assertEqual(t.encoding, "latin1")
1718 self.assertEqual(t.line_buffering, False)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 t.__init__(b, encoding="utf8", line_buffering=True)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001720 self.assertEqual(t.encoding, "utf8")
1721 self.assertEqual(t.line_buffering, True)
1722 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 self.assertRaises(TypeError, t.__init__, b, newline=42)
1724 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1725
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001726 def test_detach(self):
1727 r = self.BytesIO()
1728 b = self.BufferedWriter(r)
1729 t = self.TextIOWrapper(b)
1730 self.assertIs(t.detach(), b)
1731
1732 t = self.TextIOWrapper(b, encoding="ascii")
1733 t.write("howdy")
1734 self.assertFalse(r.getvalue())
1735 t.detach()
1736 self.assertEqual(r.getvalue(), b"howdy")
1737 self.assertRaises(ValueError, t.detach)
1738
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001739 def test_repr(self):
1740 raw = self.BytesIO("hello".encode("utf-8"))
1741 b = self.BufferedReader(raw)
1742 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001743 modname = self.TextIOWrapper.__module__
1744 self.assertEqual(repr(t),
1745 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1746 raw.name = "dummy"
1747 self.assertEqual(repr(t),
1748 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001749 t.mode = "r"
1750 self.assertEqual(repr(t),
1751 "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001752 raw.name = b"dummy"
1753 self.assertEqual(repr(t),
Antoine Pitroua4815ca2011-01-09 20:38:15 +00001754 "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001756 def test_line_buffering(self):
1757 r = self.BytesIO()
1758 b = self.BufferedWriter(r, 1000)
1759 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001760 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001761 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001762 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001763 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001764 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001765 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001766
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 def test_encoding(self):
1768 # Check the encoding attribute is always set, and valid
1769 b = self.BytesIO()
1770 t = self.TextIOWrapper(b, encoding="utf8")
1771 self.assertEqual(t.encoding, "utf8")
1772 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001773 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 codecs.lookup(t.encoding)
1775
1776 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001777 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001778 b = self.BytesIO(b"abc\n\xff\n")
1779 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001780 self.assertRaises(UnicodeError, t.read)
1781 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001782 b = self.BytesIO(b"abc\n\xff\n")
1783 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001784 self.assertRaises(UnicodeError, t.read)
1785 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001786 b = self.BytesIO(b"abc\n\xff\n")
1787 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001788 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001789 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 b = self.BytesIO(b"abc\n\xff\n")
1791 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001792 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001795 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 b = self.BytesIO()
1797 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001798 self.assertRaises(UnicodeError, t.write, "\xff")
1799 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 b = self.BytesIO()
1801 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001802 self.assertRaises(UnicodeError, t.write, "\xff")
1803 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 b = self.BytesIO()
1805 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001806 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001807 t.write("abc\xffdef\n")
1808 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001809 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001810 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 b = self.BytesIO()
1812 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001813 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001814 t.write("abc\xffdef\n")
1815 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001816 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001818 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001819 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1820
1821 tests = [
1822 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001823 [ '', input_lines ],
1824 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1825 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1826 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001827 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001828 encodings = (
1829 'utf-8', 'latin-1',
1830 'utf-16', 'utf-16-le', 'utf-16-be',
1831 'utf-32', 'utf-32-le', 'utf-32-be',
1832 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001833
Guido van Rossum8358db22007-08-18 21:39:55 +00001834 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001835 # character in TextIOWrapper._pending_line.
1836 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001837 # XXX: str.encode() should return bytes
1838 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001839 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001840 for bufsize in range(1, 10):
1841 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1843 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001844 encoding=encoding)
1845 if do_reads:
1846 got_lines = []
1847 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001848 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001849 if c2 == '':
1850 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00001851 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001852 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001853 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001854 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001855
1856 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001857 self.assertEqual(got_line, exp_line)
1858 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00001859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 def test_newlines_input(self):
1861 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001862 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1863 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001864 (None, normalized.decode("ascii").splitlines(True)),
1865 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1867 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1868 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001869 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 buf = self.BytesIO(testdata)
1871 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001872 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00001873 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001874 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00001875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 def test_newlines_output(self):
1877 testdict = {
1878 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1879 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1880 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1881 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1882 }
1883 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1884 for newline, expected in tests:
1885 buf = self.BytesIO()
1886 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1887 txt.write("AAA\nB")
1888 txt.write("BB\nCCC\n")
1889 txt.write("X\rY\r\nZ")
1890 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001891 self.assertEqual(buf.closed, False)
1892 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893
1894 def test_destructor(self):
1895 l = []
1896 base = self.BytesIO
1897 class MyBytesIO(base):
1898 def close(self):
1899 l.append(self.getvalue())
1900 base.close(self)
1901 b = MyBytesIO()
1902 t = self.TextIOWrapper(b, encoding="ascii")
1903 t.write("abc")
1904 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001905 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001906 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907
1908 def test_override_destructor(self):
1909 record = []
1910 class MyTextIO(self.TextIOWrapper):
1911 def __del__(self):
1912 record.append(1)
1913 try:
1914 f = super().__del__
1915 except AttributeError:
1916 pass
1917 else:
1918 f()
1919 def close(self):
1920 record.append(2)
1921 super().close()
1922 def flush(self):
1923 record.append(3)
1924 super().flush()
1925 b = self.BytesIO()
1926 t = MyTextIO(b, encoding="ascii")
1927 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001928 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 self.assertEqual(record, [1, 2, 3])
1930
1931 def test_error_through_destructor(self):
1932 # Test that the exception state is not modified by a destructor,
1933 # even if close() fails.
1934 rawio = self.CloseFailureIO()
1935 def f():
1936 self.TextIOWrapper(rawio).xyzzy
1937 with support.captured_output("stderr") as s:
1938 self.assertRaises(AttributeError, f)
1939 s = s.getvalue().strip()
1940 if s:
1941 # The destructor *may* have printed an unraisable error, check it
1942 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001943 self.assertTrue(s.startswith("Exception IOError: "), s)
1944 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001945
Guido van Rossum9b76da62007-04-11 01:09:03 +00001946 # Systematic tests of the text I/O API
1947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001949 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1950 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001952 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001953 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001954 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001956 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00001957 self.assertEqual(f.tell(), 0)
1958 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001959 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001960 self.assertEqual(f.seek(0), 0)
1961 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001962 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001963 self.assertEqual(f.read(2), "ab")
1964 self.assertEqual(f.read(1), "c")
1965 self.assertEqual(f.read(1), "")
1966 self.assertEqual(f.read(), "")
1967 self.assertEqual(f.tell(), cookie)
1968 self.assertEqual(f.seek(0), 0)
1969 self.assertEqual(f.seek(0, 2), cookie)
1970 self.assertEqual(f.write("def"), 3)
1971 self.assertEqual(f.seek(cookie), cookie)
1972 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001973 if enc.startswith("utf"):
1974 self.multi_line_test(f, enc)
1975 f.close()
1976
1977 def multi_line_test(self, f, enc):
1978 f.seek(0)
1979 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001980 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001981 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001982 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001983 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001984 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001985 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001986 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001987 wlines.append((f.tell(), line))
1988 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001989 f.seek(0)
1990 rlines = []
1991 while True:
1992 pos = f.tell()
1993 line = f.readline()
1994 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001995 break
1996 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001997 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 def test_telling(self):
2000 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002001 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002002 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002003 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002004 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002005 p2 = f.tell()
2006 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002007 self.assertEqual(f.tell(), p0)
2008 self.assertEqual(f.readline(), "\xff\n")
2009 self.assertEqual(f.tell(), p1)
2010 self.assertEqual(f.readline(), "\xff\n")
2011 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002012 f.seek(0)
2013 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002014 self.assertEqual(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002015 self.assertRaises(IOError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002016 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002017 f.close()
2018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 def test_seeking(self):
2020 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002021 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002022 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002023 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002024 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002025 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002026 suffix = bytes(u_suffix.encode("utf-8"))
2027 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002028 with self.open(support.TESTFN, "wb") as f:
2029 f.write(line*2)
2030 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2031 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002032 self.assertEqual(s, str(prefix, "ascii"))
2033 self.assertEqual(f.tell(), prefix_size)
2034 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002035
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002037 # Regression test for a specific bug
2038 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002039 with self.open(support.TESTFN, "wb") as f:
2040 f.write(data)
2041 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2042 f._CHUNK_SIZE # Just test that it exists
2043 f._CHUNK_SIZE = 2
2044 f.readline()
2045 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002046
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 def test_seek_and_tell(self):
2048 #Test seek/tell using the StatefulIncrementalDecoder.
2049 # Make test faster by doing smaller seeks
2050 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002051
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002052 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002053 """Tell/seek to various points within a data stream and ensure
2054 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002056 f.write(data)
2057 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002058 f = self.open(support.TESTFN, encoding='test_decoder')
2059 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002060 decoded = f.read()
2061 f.close()
2062
Neal Norwitze2b07052008-03-18 19:52:05 +00002063 for i in range(min_pos, len(decoded) + 1): # seek positions
2064 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002065 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002066 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002067 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002068 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002069 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002071 f.close()
2072
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002073 # Enable the test decoder.
2074 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002075
2076 # Run the tests.
2077 try:
2078 # Try each test case.
2079 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002080 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002081
2082 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002083 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2084 offset = CHUNK_SIZE - len(input)//2
2085 prefix = b'.'*offset
2086 # Don't bother seeking into the prefix (takes too long).
2087 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002088 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002089
2090 # Ensure our test decoder won't interfere with subsequent tests.
2091 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002092 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002094 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002095 data = "1234567890"
2096 tests = ("utf-16",
2097 "utf-16-le",
2098 "utf-16-be",
2099 "utf-32",
2100 "utf-32-le",
2101 "utf-32-be")
2102 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103 buf = self.BytesIO()
2104 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002105 # Check if the BOM is written only once (see issue1753).
2106 f.write(data)
2107 f.write(data)
2108 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002109 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002110 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002111 self.assertEqual(f.read(), data * 2)
2112 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002113
Benjamin Petersona1b49012009-03-31 23:11:32 +00002114 def test_unreadable(self):
2115 class UnReadable(self.BytesIO):
2116 def readable(self):
2117 return False
2118 txt = self.TextIOWrapper(UnReadable())
2119 self.assertRaises(IOError, txt.read)
2120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002121 def test_read_one_by_one(self):
2122 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002123 reads = ""
2124 while True:
2125 c = txt.read(1)
2126 if not c:
2127 break
2128 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002129 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002130
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002131 def test_readlines(self):
2132 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2133 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2134 txt.seek(0)
2135 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2136 txt.seek(0)
2137 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2138
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002139 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002141 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002143 reads = ""
2144 while True:
2145 c = txt.read(128)
2146 if not c:
2147 break
2148 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002149 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002150
2151 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002152 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002153
2154 # read one char at a time
2155 reads = ""
2156 while True:
2157 c = txt.read(1)
2158 if not c:
2159 break
2160 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002162
2163 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002165 txt._CHUNK_SIZE = 4
2166
2167 reads = ""
2168 while True:
2169 c = txt.read(4)
2170 if not c:
2171 break
2172 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00002173 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002174
2175 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002177 txt._CHUNK_SIZE = 4
2178
2179 reads = txt.read(4)
2180 reads += txt.read(4)
2181 reads += txt.readline()
2182 reads += txt.readline()
2183 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002185
2186 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002188 txt._CHUNK_SIZE = 4
2189
2190 reads = txt.read(4)
2191 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002193
2194 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002195 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002196 txt._CHUNK_SIZE = 4
2197
2198 reads = txt.read(4)
2199 pos = txt.tell()
2200 txt.seek(0)
2201 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002203
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002204 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 buffer = self.BytesIO(self.testdata)
2206 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002207
2208 self.assertEqual(buffer.seekable(), txt.seekable())
2209
Antoine Pitroue4501852009-05-14 18:55:55 +00002210 def test_append_bom(self):
2211 # The BOM is not written again when appending to a non-empty file
2212 filename = support.TESTFN
2213 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2214 with self.open(filename, 'w', encoding=charset) as f:
2215 f.write('aaa')
2216 pos = f.tell()
2217 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002219
2220 with self.open(filename, 'a', encoding=charset) as f:
2221 f.write('xxx')
2222 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002224
2225 def test_seek_bom(self):
2226 # Same test, but when seeking manually
2227 filename = support.TESTFN
2228 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2229 with self.open(filename, 'w', encoding=charset) as f:
2230 f.write('aaa')
2231 pos = f.tell()
2232 with self.open(filename, 'r+', encoding=charset) as f:
2233 f.seek(pos)
2234 f.write('zzz')
2235 f.seek(0)
2236 f.write('bbb')
2237 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002238 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00002239
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002240 def test_errors_property(self):
2241 with self.open(support.TESTFN, "w") as f:
2242 self.assertEqual(f.errors, "strict")
2243 with self.open(support.TESTFN, "w", errors="replace") as f:
2244 self.assertEqual(f.errors, "replace")
2245
Victor Stinner45df8202010-04-28 22:31:17 +00002246 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002247 def test_threads_write(self):
2248 # Issue6750: concurrent writes could duplicate data
2249 event = threading.Event()
2250 with self.open(support.TESTFN, "w", buffering=1) as f:
2251 def run(n):
2252 text = "Thread%03d\n" % n
2253 event.wait()
2254 f.write(text)
2255 threads = [threading.Thread(target=lambda n=x: run(n))
2256 for x in range(20)]
2257 for t in threads:
2258 t.start()
2259 time.sleep(0.02)
2260 event.set()
2261 for t in threads:
2262 t.join()
2263 with self.open(support.TESTFN) as f:
2264 content = f.read()
2265 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002267
Antoine Pitrou6be88762010-05-03 16:48:20 +00002268 def test_flush_error_on_close(self):
2269 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2270 def bad_flush():
2271 raise IOError()
2272 txt.flush = bad_flush
2273 self.assertRaises(IOError, txt.close) # exception not swallowed
2274
2275 def test_multi_close(self):
2276 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2277 txt.close()
2278 txt.close()
2279 txt.close()
2280 self.assertRaises(ValueError, txt.flush)
2281
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002282 def test_unseekable(self):
2283 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
2284 self.assertRaises(self.UnsupportedOperation, txt.tell)
2285 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
2286
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00002287 def test_readonly_attributes(self):
2288 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2289 buf = self.BytesIO(self.testdata)
2290 with self.assertRaises(AttributeError):
2291 txt.buffer = buf
2292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002293class CTextIOWrapperTest(TextIOWrapperTest):
2294
2295 def test_initialization(self):
2296 r = self.BytesIO(b"\xc3\xa9\n\n")
2297 b = self.BufferedReader(r, 1000)
2298 t = self.TextIOWrapper(b)
2299 self.assertRaises(TypeError, t.__init__, b, newline=42)
2300 self.assertRaises(ValueError, t.read)
2301 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2302 self.assertRaises(ValueError, t.read)
2303
2304 def test_garbage_collection(self):
2305 # C TextIOWrapper objects are collected, and collecting them flushes
2306 # all data to disk.
2307 # The Python version has __del__, so it ends in gc.garbage instead.
2308 rawio = io.FileIO(support.TESTFN, "wb")
2309 b = self.BufferedWriter(rawio)
2310 t = self.TextIOWrapper(b, encoding="ascii")
2311 t.write("456def")
2312 t.x = t
2313 wr = weakref.ref(t)
2314 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002315 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002316 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002317 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002318 self.assertEqual(f.read(), b"456def")
2319
2320class PyTextIOWrapperTest(TextIOWrapperTest):
2321 pass
2322
2323
2324class IncrementalNewlineDecoderTest(unittest.TestCase):
2325
2326 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002327 # UTF-8 specific tests for a newline decoder
2328 def _check_decode(b, s, **kwargs):
2329 # We exercise getstate() / setstate() as well as decode()
2330 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002331 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002332 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002333 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002334
Antoine Pitrou180a3362008-12-14 16:36:46 +00002335 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002336
Antoine Pitrou180a3362008-12-14 16:36:46 +00002337 _check_decode(b'\xe8', "")
2338 _check_decode(b'\xa2', "")
2339 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002340
Antoine Pitrou180a3362008-12-14 16:36:46 +00002341 _check_decode(b'\xe8', "")
2342 _check_decode(b'\xa2', "")
2343 _check_decode(b'\x88', "\u8888")
2344
2345 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002346 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2347
Antoine Pitrou180a3362008-12-14 16:36:46 +00002348 decoder.reset()
2349 _check_decode(b'\n', "\n")
2350 _check_decode(b'\r', "")
2351 _check_decode(b'', "\n", final=True)
2352 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002353
Antoine Pitrou180a3362008-12-14 16:36:46 +00002354 _check_decode(b'\r', "")
2355 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002356
Antoine Pitrou180a3362008-12-14 16:36:46 +00002357 _check_decode(b'\r\r\n', "\n\n")
2358 _check_decode(b'\r', "")
2359 _check_decode(b'\r', "\n")
2360 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002361
Antoine Pitrou180a3362008-12-14 16:36:46 +00002362 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2363 _check_decode(b'\xe8\xa2\x88', "\u8888")
2364 _check_decode(b'\n', "\n")
2365 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2366 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002367
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002368 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002369 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 if encoding is not None:
2371 encoder = codecs.getincrementalencoder(encoding)()
2372 def _decode_bytewise(s):
2373 # Decode one byte at a time
2374 for b in encoder.encode(s):
2375 result.append(decoder.decode(bytes([b])))
2376 else:
2377 encoder = None
2378 def _decode_bytewise(s):
2379 # Decode one char at a time
2380 for c in s:
2381 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002382 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002383 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002384 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00002385 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002386 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002387 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002388 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002389 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002390 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002391 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002392 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002393 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 input = "abc"
2395 if encoder is not None:
2396 encoder.reset()
2397 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002398 self.assertEqual(decoder.decode(input), "abc")
2399 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00002400
2401 def test_newline_decoder(self):
2402 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002403 # None meaning the IncrementalNewlineDecoder takes unicode input
2404 # rather than bytes input
2405 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002406 'utf-16', 'utf-16-le', 'utf-16-be',
2407 'utf-32', 'utf-32-le', 'utf-32-be',
2408 )
2409 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002410 decoder = enc and codecs.getincrementaldecoder(enc)()
2411 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2412 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002413 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002414 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2415 self.check_newline_decoding_utf8(decoder)
2416
Antoine Pitrou66913e22009-03-06 23:40:56 +00002417 def test_newline_bytes(self):
2418 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2419 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002420 self.assertEqual(dec.newlines, None)
2421 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2422 self.assertEqual(dec.newlines, None)
2423 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2424 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00002425 dec = self.IncrementalNewlineDecoder(None, translate=False)
2426 _check(dec)
2427 dec = self.IncrementalNewlineDecoder(None, translate=True)
2428 _check(dec)
2429
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002430class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2431 pass
2432
2433class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2434 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002435
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002436
Guido van Rossum01a27522007-03-07 01:00:12 +00002437# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002438
Guido van Rossum5abbf752007-08-27 17:39:33 +00002439class MiscIOTest(unittest.TestCase):
2440
Barry Warsaw40e82462008-11-20 20:14:50 +00002441 def tearDown(self):
2442 support.unlink(support.TESTFN)
2443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002444 def test___all__(self):
2445 for name in self.io.__all__:
2446 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002447 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002448 if name == "open":
2449 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002450 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002451 self.assertTrue(issubclass(obj, Exception), name)
2452 elif not name.startswith("SEEK_"):
2453 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002454
Barry Warsaw40e82462008-11-20 20:14:50 +00002455 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002456 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002457 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002458 f.close()
2459
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002460 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002461 self.assertEqual(f.name, support.TESTFN)
2462 self.assertEqual(f.buffer.name, support.TESTFN)
2463 self.assertEqual(f.buffer.raw.name, support.TESTFN)
2464 self.assertEqual(f.mode, "U")
2465 self.assertEqual(f.buffer.mode, "rb")
2466 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002467 f.close()
2468
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002469 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002470 self.assertEqual(f.mode, "w+")
2471 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
2472 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002473
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002474 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002475 self.assertEqual(g.mode, "wb")
2476 self.assertEqual(g.raw.mode, "wb")
2477 self.assertEqual(g.name, f.fileno())
2478 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00002479 f.close()
2480 g.close()
2481
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002482 def test_io_after_close(self):
2483 for kwargs in [
2484 {"mode": "w"},
2485 {"mode": "wb"},
2486 {"mode": "w", "buffering": 1},
2487 {"mode": "w", "buffering": 2},
2488 {"mode": "wb", "buffering": 0},
2489 {"mode": "r"},
2490 {"mode": "rb"},
2491 {"mode": "r", "buffering": 1},
2492 {"mode": "r", "buffering": 2},
2493 {"mode": "rb", "buffering": 0},
2494 {"mode": "w+"},
2495 {"mode": "w+b"},
2496 {"mode": "w+", "buffering": 1},
2497 {"mode": "w+", "buffering": 2},
2498 {"mode": "w+b", "buffering": 0},
2499 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002501 f.close()
2502 self.assertRaises(ValueError, f.flush)
2503 self.assertRaises(ValueError, f.fileno)
2504 self.assertRaises(ValueError, f.isatty)
2505 self.assertRaises(ValueError, f.__iter__)
2506 if hasattr(f, "peek"):
2507 self.assertRaises(ValueError, f.peek, 1)
2508 self.assertRaises(ValueError, f.read)
2509 if hasattr(f, "read1"):
2510 self.assertRaises(ValueError, f.read1, 1024)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02002511 if hasattr(f, "readall"):
2512 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002513 if hasattr(f, "readinto"):
2514 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2515 self.assertRaises(ValueError, f.readline)
2516 self.assertRaises(ValueError, f.readlines)
2517 self.assertRaises(ValueError, f.seek, 0)
2518 self.assertRaises(ValueError, f.tell)
2519 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002520 self.assertRaises(ValueError, f.write,
2521 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002522 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002523 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002524
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002525 def test_blockingioerror(self):
2526 # Various BlockingIOError issues
2527 self.assertRaises(TypeError, self.BlockingIOError)
2528 self.assertRaises(TypeError, self.BlockingIOError, 1)
2529 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2530 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2531 b = self.BlockingIOError(1, "")
2532 self.assertEqual(b.characters_written, 0)
2533 class C(str):
2534 pass
2535 c = C("")
2536 b = self.BlockingIOError(1, c)
2537 c.b = b
2538 b.c = c
2539 wr = weakref.ref(c)
2540 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002541 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002542 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002543
2544 def test_abcs(self):
2545 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002546 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2547 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2548 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2549 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550
2551 def _check_abc_inheritance(self, abcmodule):
2552 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002553 self.assertIsInstance(f, abcmodule.IOBase)
2554 self.assertIsInstance(f, abcmodule.RawIOBase)
2555 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2556 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002558 self.assertIsInstance(f, abcmodule.IOBase)
2559 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2560 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2561 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002562 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002563 self.assertIsInstance(f, abcmodule.IOBase)
2564 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2565 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2566 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002567
2568 def test_abc_inheritance(self):
2569 # Test implementations inherit from their respective ABCs
2570 self._check_abc_inheritance(self)
2571
2572 def test_abc_inheritance_official(self):
2573 # Test implementations inherit from the official ABCs of the
2574 # baseline "io" module.
2575 self._check_abc_inheritance(io)
2576
Antoine Pitroue033e062010-10-29 10:38:18 +00002577 def _check_warn_on_dealloc(self, *args, **kwargs):
2578 f = open(*args, **kwargs)
2579 r = repr(f)
2580 with self.assertWarns(ResourceWarning) as cm:
2581 f = None
2582 support.gc_collect()
2583 self.assertIn(r, str(cm.warning.args[0]))
2584
2585 def test_warn_on_dealloc(self):
2586 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
2587 self._check_warn_on_dealloc(support.TESTFN, "wb")
2588 self._check_warn_on_dealloc(support.TESTFN, "w")
2589
2590 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
2591 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00002592 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00002593 for fd in fds:
2594 try:
2595 os.close(fd)
2596 except EnvironmentError as e:
2597 if e.errno != errno.EBADF:
2598 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00002599 self.addCleanup(cleanup_fds)
2600 r, w = os.pipe()
2601 fds += r, w
2602 self._check_warn_on_dealloc(r, *args, **kwargs)
2603 # When using closefd=False, there's no warning
2604 r, w = os.pipe()
2605 fds += r, w
2606 with warnings.catch_warnings(record=True) as recorded:
2607 open(r, *args, closefd=False, **kwargs)
2608 support.gc_collect()
2609 self.assertEqual(recorded, [])
Antoine Pitroue033e062010-10-29 10:38:18 +00002610
2611 def test_warn_on_dealloc_fd(self):
2612 self._check_warn_on_dealloc_fd("rb", buffering=0)
2613 self._check_warn_on_dealloc_fd("rb")
2614 self._check_warn_on_dealloc_fd("r")
2615
2616
Antoine Pitrou243757e2010-11-05 21:15:39 +00002617 def test_pickling(self):
2618 # Pickling file objects is forbidden
2619 for kwargs in [
2620 {"mode": "w"},
2621 {"mode": "wb"},
2622 {"mode": "wb", "buffering": 0},
2623 {"mode": "r"},
2624 {"mode": "rb"},
2625 {"mode": "rb", "buffering": 0},
2626 {"mode": "w+"},
2627 {"mode": "w+b"},
2628 {"mode": "w+b", "buffering": 0},
2629 ]:
2630 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
2631 with self.open(support.TESTFN, **kwargs) as f:
2632 self.assertRaises(TypeError, pickle.dumps, f, protocol)
2633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002634class CMiscIOTest(MiscIOTest):
2635 io = io
2636
2637class PyMiscIOTest(MiscIOTest):
2638 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002639
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002640
2641@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2642class SignalsTest(unittest.TestCase):
2643
2644 def setUp(self):
2645 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2646
2647 def tearDown(self):
2648 signal.signal(signal.SIGALRM, self.oldalrm)
2649
2650 def alarm_interrupt(self, sig, frame):
2651 1/0
2652
2653 @unittest.skipUnless(threading, 'Threading required for this test.')
2654 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2655 """Check that a partial write, when it gets interrupted, properly
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002656 invokes the signal handler, and bubbles up the exception raised
2657 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002658 read_results = []
2659 def _read():
2660 s = os.read(r, 1)
2661 read_results.append(s)
2662 t = threading.Thread(target=_read)
2663 t.daemon = True
2664 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00002665 fdopen_kwargs["closefd"] = False
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002666 try:
2667 wio = self.io.open(w, **fdopen_kwargs)
2668 t.start()
2669 signal.alarm(1)
2670 # Fill the pipe enough that the write will be blocking.
2671 # It will be interrupted by the timer armed above. Since the
2672 # other thread has read one byte, the low-level write will
2673 # return with a successful (partial) result rather than an EINTR.
2674 # The buffered IO layer must check for pending signal
2675 # handlers, which in this case will invoke alarm_interrupt().
2676 self.assertRaises(ZeroDivisionError,
2677 wio.write, item * (1024 * 1024))
2678 t.join()
2679 # We got one byte, get another one and check that it isn't a
2680 # repeat of the first one.
2681 read_results.append(os.read(r, 1))
2682 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2683 finally:
2684 os.close(w)
2685 os.close(r)
2686 # This is deliberate. If we didn't close the file descriptor
2687 # before closing wio, wio would try to flush its internal
2688 # buffer, and block again.
2689 try:
2690 wio.close()
2691 except IOError as e:
2692 if e.errno != errno.EBADF:
2693 raise
2694
2695 def test_interrupted_write_unbuffered(self):
2696 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2697
2698 def test_interrupted_write_buffered(self):
2699 self.check_interrupted_write(b"xy", b"xy", mode="wb")
2700
2701 def test_interrupted_write_text(self):
2702 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2703
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002704 def check_reentrant_write(self, data, **fdopen_kwargs):
2705 def on_alarm(*args):
2706 # Will be called reentrantly from the same thread
2707 wio.write(data)
2708 1/0
2709 signal.signal(signal.SIGALRM, on_alarm)
2710 r, w = os.pipe()
2711 wio = self.io.open(w, **fdopen_kwargs)
2712 try:
2713 signal.alarm(1)
2714 # Either the reentrant call to wio.write() fails with RuntimeError,
2715 # or the signal handler raises ZeroDivisionError.
2716 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2717 while 1:
2718 for i in range(100):
2719 wio.write(data)
2720 wio.flush()
2721 # Make sure the buffer doesn't fill up and block further writes
2722 os.read(r, len(data) * 100)
2723 exc = cm.exception
2724 if isinstance(exc, RuntimeError):
2725 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2726 finally:
2727 wio.close()
2728 os.close(r)
2729
2730 def test_reentrant_write_buffered(self):
2731 self.check_reentrant_write(b"xy", mode="wb")
2732
2733 def test_reentrant_write_text(self):
2734 self.check_reentrant_write("xy", mode="w", encoding="ascii")
2735
Antoine Pitroud843c2d2011-02-25 21:34:39 +00002736 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2737 """Check that a buffered read, when it gets interrupted (either
2738 returning a partial result or EINTR), properly invokes the signal
2739 handler and retries if the latter returned successfully."""
2740 r, w = os.pipe()
2741 fdopen_kwargs["closefd"] = False
2742 def alarm_handler(sig, frame):
2743 os.write(w, b"bar")
2744 signal.signal(signal.SIGALRM, alarm_handler)
2745 try:
2746 rio = self.io.open(r, **fdopen_kwargs)
2747 os.write(w, b"foo")
2748 signal.alarm(1)
2749 # Expected behaviour:
2750 # - first raw read() returns partial b"foo"
2751 # - second raw read() returns EINTR
2752 # - third raw read() returns b"bar"
2753 self.assertEqual(decode(rio.read(6)), "foobar")
2754 finally:
2755 rio.close()
2756 os.close(w)
2757 os.close(r)
2758
2759 def test_interrupterd_read_retry_buffered(self):
2760 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2761 mode="rb")
2762
2763 def test_interrupterd_read_retry_text(self):
2764 self.check_interrupted_read_retry(lambda x: x,
2765 mode="r")
2766
2767 @unittest.skipUnless(threading, 'Threading required for this test.')
2768 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2769 """Check that a buffered write, when it gets interrupted (either
2770 returning a partial result or EINTR), properly invokes the signal
2771 handler and retries if the latter returned successfully."""
2772 select = support.import_module("select")
2773 # A quantity that exceeds the buffer size of an anonymous pipe's
2774 # write end.
2775 N = 1024 * 1024
2776 r, w = os.pipe()
2777 fdopen_kwargs["closefd"] = False
2778 # We need a separate thread to read from the pipe and allow the
2779 # write() to finish. This thread is started after the SIGALRM is
2780 # received (forcing a first EINTR in write()).
2781 read_results = []
2782 write_finished = False
2783 def _read():
2784 while not write_finished:
2785 while r in select.select([r], [], [], 1.0)[0]:
2786 s = os.read(r, 1024)
2787 read_results.append(s)
2788 t = threading.Thread(target=_read)
2789 t.daemon = True
2790 def alarm1(sig, frame):
2791 signal.signal(signal.SIGALRM, alarm2)
2792 signal.alarm(1)
2793 def alarm2(sig, frame):
2794 t.start()
2795 signal.signal(signal.SIGALRM, alarm1)
2796 try:
2797 wio = self.io.open(w, **fdopen_kwargs)
2798 signal.alarm(1)
2799 # Expected behaviour:
2800 # - first raw write() is partial (because of the limited pipe buffer
2801 # and the first alarm)
2802 # - second raw write() returns EINTR (because of the second alarm)
2803 # - subsequent write()s are successful (either partial or complete)
2804 self.assertEqual(N, wio.write(item * N))
2805 wio.flush()
2806 write_finished = True
2807 t.join()
2808 self.assertEqual(N, sum(len(x) for x in read_results))
2809 finally:
2810 write_finished = True
2811 os.close(w)
2812 os.close(r)
2813 # This is deliberate. If we didn't close the file descriptor
2814 # before closing wio, wio would try to flush its internal
2815 # buffer, and could block (in case of failure).
2816 try:
2817 wio.close()
2818 except IOError as e:
2819 if e.errno != errno.EBADF:
2820 raise
2821
2822 def test_interrupterd_write_retry_buffered(self):
2823 self.check_interrupted_write_retry(b"x", mode="wb")
2824
2825 def test_interrupterd_write_retry_text(self):
2826 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
2827
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002828
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002829class CSignalsTest(SignalsTest):
2830 io = io
2831
2832class PySignalsTest(SignalsTest):
2833 io = pyio
2834
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00002835 # Handling reentrancy issues would slow down _pyio even more, so the
2836 # tests are disabled.
2837 test_reentrant_write_buffered = None
2838 test_reentrant_write_text = None
2839
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002840
Guido van Rossum28524c72007-02-27 05:47:44 +00002841def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 tests = (CIOTest, PyIOTest,
2843 CBufferedReaderTest, PyBufferedReaderTest,
2844 CBufferedWriterTest, PyBufferedWriterTest,
2845 CBufferedRWPairTest, PyBufferedRWPairTest,
2846 CBufferedRandomTest, PyBufferedRandomTest,
2847 StatefulIncrementalDecoderTest,
2848 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2849 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00002850 CMiscIOTest, PyMiscIOTest,
2851 CSignalsTest, PySignalsTest,
2852 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002853
2854 # Put the namespaces of the IO module we are testing and some useful mock
2855 # classes in the __dict__ of each test.
2856 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
Antoine Pitrou328ec742010-09-14 18:37:24 +00002857 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002858 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2859 c_io_ns = {name : getattr(io, name) for name in all_members}
2860 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2861 globs = globals()
2862 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2863 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2864 # Avoid turning open into a bound method.
2865 py_io_ns["open"] = pyio.OpenWrapper
2866 for test in tests:
2867 if test.__name__.startswith("C"):
2868 for name, obj in c_io_ns.items():
2869 setattr(test, name, obj)
2870 elif test.__name__.startswith("Py"):
2871 for name, obj in py_io_ns.items():
2872 setattr(test, name, obj)
2873
2874 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002875
2876if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 test_main()