blob: c584b92040997e7aa94a80a96578b1a8511a6b1e [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import random
Guido van Rossum28524c72007-02-27 05:47:44 +000027import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000028import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import abc
Georg Brandl1b37e872010-03-14 10:45:50 +000030from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000031from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000033
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000034import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000035import io # C implementation of io
36import _pyio as pyio # Python implementation of io
Victor Stinner45df8202010-04-28 22:31:17 +000037try:
38 import threading
39except ImportError:
40 threading = None
Guido van Rossum28524c72007-02-27 05:47:44 +000041
Guido van Rossuma9e20242007-03-08 00:43:48 +000042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000043def _default_chunk_size():
44 """Get the default TextIOWrapper chunk size"""
45 with open(__file__, "r", encoding="latin1") as f:
46 return f._CHUNK_SIZE
47
48
49class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000050
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000051 def __init__(self, read_stack=()):
52 self._read_stack = list(read_stack)
53 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000055 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000056
57 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000058 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000060 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061 except:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000062 self._extraneous_reads += 1
Guido van Rossum78892e42007-04-06 17:31:18 +000063 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000064
Guido van Rossum01a27522007-03-07 01:00:12 +000065 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000066 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000067 return len(b)
68
69 def writable(self):
70 return True
71
Guido van Rossum68bbcd22007-02-27 17:19:33 +000072 def fileno(self):
73 return 42
74
75 def readable(self):
76 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000079 return True
80
Guido van Rossum01a27522007-03-07 01:00:12 +000081 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000083
84 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085 return 0 # same comment as above
86
87 def readinto(self, buf):
88 self._reads += 1
89 max_len = len(buf)
90 try:
91 data = self._read_stack[0]
92 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +000093 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 return 0
95 if data is None:
96 del self._read_stack[0]
97 return None
98 n = len(data)
99 if len(data) <= max_len:
100 del self._read_stack[0]
101 buf[:n] = data
102 return n
103 else:
104 buf[:] = data[:max_len]
105 self._read_stack[0] = data[max_len:]
106 return max_len
107
108 def truncate(self, pos=None):
109 return pos
110
111class CMockRawIO(MockRawIO, io.RawIOBase):
112 pass
113
114class PyMockRawIO(MockRawIO, pyio.RawIOBase):
115 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000116
Guido van Rossuma9e20242007-03-08 00:43:48 +0000117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000118class MisbehavedRawIO(MockRawIO):
119 def write(self, b):
120 return super().write(b) * 2
121
122 def read(self, n=None):
123 return super().read(n) * 2
124
125 def seek(self, pos, whence):
126 return -123
127
128 def tell(self):
129 return -456
130
131 def readinto(self, buf):
132 super().readinto(buf)
133 return len(buf) * 5
134
135class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
136 pass
137
138class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
139 pass
140
141
142class CloseFailureIO(MockRawIO):
143 closed = 0
144
145 def close(self):
146 if not self.closed:
147 self.closed = 1
148 raise IOError
149
150class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
151 pass
152
153class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
154 pass
155
156
157class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def __init__(self, data):
160 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000162
163 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000165 self.read_history.append(None if res is None else len(res))
166 return res
167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000168 def readinto(self, b):
169 res = super().readinto(b)
170 self.read_history.append(res)
171 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000173class CMockFileIO(MockFileIO, io.BytesIO):
174 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000176class PyMockFileIO(MockFileIO, pyio.BytesIO):
177 pass
178
179
180class MockNonBlockWriterIO:
181
182 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000183 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000184 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000186 def pop_written(self):
187 s = b"".join(self._write_stack)
188 self._write_stack[:] = []
189 return s
190
191 def block_on(self, char):
192 """Block when a given char is encountered."""
193 self._blocker_char = char
194
195 def readable(self):
196 return True
197
198 def seekable(self):
199 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000200
Guido van Rossum01a27522007-03-07 01:00:12 +0000201 def writable(self):
202 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000204 def write(self, b):
205 b = bytes(b)
206 n = -1
207 if self._blocker_char:
208 try:
209 n = b.index(self._blocker_char)
210 except ValueError:
211 pass
212 else:
213 self._blocker_char = None
214 self._write_stack.append(b[:n])
215 raise self.BlockingIOError(0, "test blocking", n)
216 self._write_stack.append(b)
217 return len(b)
218
219class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
220 BlockingIOError = io.BlockingIOError
221
222class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
223 BlockingIOError = pyio.BlockingIOError
224
Guido van Rossuma9e20242007-03-08 00:43:48 +0000225
Guido van Rossum28524c72007-02-27 05:47:44 +0000226class IOTest(unittest.TestCase):
227
Neal Norwitze7789b12008-03-24 06:18:09 +0000228 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000230
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000231 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000232 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000233
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000236 f.truncate(0)
237 self.assertEqual(f.tell(), 5)
238 f.seek(0)
239
240 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000241 self.assertEqual(f.seek(0), 0)
242 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000243 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000244 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000246 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000247 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000248 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000249 self.assertEqual(f.seek(-1, 2), 13)
250 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000251
Guido van Rossum87429772007-04-10 21:06:59 +0000252 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000253 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000254 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000255
Guido van Rossum9b76da62007-04-11 01:09:03 +0000256 def read_ops(self, f, buffered=False):
257 data = f.read(5)
258 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000259 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000260 self.assertEqual(f.readinto(data), 5)
261 self.assertEqual(data, b" worl")
262 self.assertEqual(f.readinto(data), 2)
263 self.assertEqual(len(data), 5)
264 self.assertEqual(data[:2], b"d\n")
265 self.assertEqual(f.seek(0), 0)
266 self.assertEqual(f.read(20), b"hello world\n")
267 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000268 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000269 self.assertEqual(f.seek(-6, 2), 6)
270 self.assertEqual(f.read(5), b"world")
271 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000272 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000273 self.assertEqual(f.seek(-6, 1), 5)
274 self.assertEqual(f.read(5), b" worl")
275 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000276 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000277 if buffered:
278 f.seek(0)
279 self.assertEqual(f.read(), b"hello world\n")
280 f.seek(6)
281 self.assertEqual(f.read(), b"world\n")
282 self.assertEqual(f.read(), b"")
283
Guido van Rossum34d69e52007-04-10 20:08:41 +0000284 LARGE = 2**31
285
Guido van Rossum53807da2007-04-10 19:01:47 +0000286 def large_file_ops(self, f):
287 assert f.readable()
288 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(self.LARGE), self.LARGE)
290 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000292 self.assertEqual(f.tell(), self.LARGE + 3)
293 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000294 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000295 self.assertEqual(f.tell(), self.LARGE + 2)
296 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000297 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000298 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000299 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
300 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000301 self.assertEqual(f.read(2), b"x")
302
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000303 def test_invalid_operations(self):
304 # Try writing on a file opened in read mode and vice-versa.
305 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000306 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000307 self.assertRaises(IOError, fp.read)
308 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000309 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000310 self.assertRaises(IOError, fp.write, b"blah")
311 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000312 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000313 self.assertRaises(IOError, fp.write, "blah")
314 self.assertRaises(IOError, fp.writelines, ["blah\n"])
315
Guido van Rossum28524c72007-02-27 05:47:44 +0000316 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000317 with self.open(support.TESTFN, "wb", buffering=0) as f:
318 self.assertEqual(f.readable(), False)
319 self.assertEqual(f.writable(), True)
320 self.assertEqual(f.seekable(), True)
321 self.write_ops(f)
322 with self.open(support.TESTFN, "rb", buffering=0) as f:
323 self.assertEqual(f.readable(), True)
324 self.assertEqual(f.writable(), False)
325 self.assertEqual(f.seekable(), True)
326 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000327
Guido van Rossum87429772007-04-10 21:06:59 +0000328 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000329 with self.open(support.TESTFN, "wb") as f:
330 self.assertEqual(f.readable(), False)
331 self.assertEqual(f.writable(), True)
332 self.assertEqual(f.seekable(), True)
333 self.write_ops(f)
334 with self.open(support.TESTFN, "rb") as f:
335 self.assertEqual(f.readable(), True)
336 self.assertEqual(f.writable(), False)
337 self.assertEqual(f.seekable(), True)
338 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000339
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000340 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000341 with self.open(support.TESTFN, "wb") as f:
342 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
343 with self.open(support.TESTFN, "rb") as f:
344 self.assertEqual(f.readline(), b"abc\n")
345 self.assertEqual(f.readline(10), b"def\n")
346 self.assertEqual(f.readline(2), b"xy")
347 self.assertEqual(f.readline(4), b"zzy\n")
348 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000349 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000350 self.assertRaises(TypeError, f.readline, 5.3)
351 with self.open(support.TESTFN, "r") as f:
352 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000353
Guido van Rossum28524c72007-02-27 05:47:44 +0000354 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000355 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000356 self.write_ops(f)
357 data = f.getvalue()
358 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000359 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000361
Guido van Rossum53807da2007-04-10 19:01:47 +0000362 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000363 # On Windows and Mac OSX this test comsumes large resources; It takes
364 # a long time to build the >2GB file and takes >2GB of disk space
365 # therefore the resource must be enabled to run this test.
366 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000367 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000368 print("\nTesting large file ops skipped on %s." % sys.platform,
369 file=sys.stderr)
370 print("It requires %d bytes and a long time." % self.LARGE,
371 file=sys.stderr)
372 print("Use 'regrtest.py -u largefile test_io' to run it.",
373 file=sys.stderr)
374 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000375 with self.open(support.TESTFN, "w+b", 0) as f:
376 self.large_file_ops(f)
377 with self.open(support.TESTFN, "w+b") as f:
378 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000379
380 def test_with_open(self):
381 for bufsize in (0, 1, 100):
382 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000383 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000384 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000385 self.assertEqual(f.closed, True)
386 f = None
387 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000388 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000389 1/0
390 except ZeroDivisionError:
391 self.assertEqual(f.closed, True)
392 else:
393 self.fail("1/0 didn't raise an exception")
394
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395 # issue 5008
396 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000398 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000400 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000402 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000403 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000404 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000405
Guido van Rossum87429772007-04-10 21:06:59 +0000406 def test_destructor(self):
407 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000408 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000409 def __del__(self):
410 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000411 try:
412 f = super().__del__
413 except AttributeError:
414 pass
415 else:
416 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000417 def close(self):
418 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000419 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000420 def flush(self):
421 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000422 super().flush()
423 f = MyFileIO(support.TESTFN, "wb")
424 f.write(b"xxx")
425 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000426 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000427 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000428 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000429 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000430
431 def _check_base_destructor(self, base):
432 record = []
433 class MyIO(base):
434 def __init__(self):
435 # This exercises the availability of attributes on object
436 # destruction.
437 # (in the C version, close() is called by the tp_dealloc
438 # function, not by __del__)
439 self.on_del = 1
440 self.on_close = 2
441 self.on_flush = 3
442 def __del__(self):
443 record.append(self.on_del)
444 try:
445 f = super().__del__
446 except AttributeError:
447 pass
448 else:
449 f()
450 def close(self):
451 record.append(self.on_close)
452 super().close()
453 def flush(self):
454 record.append(self.on_flush)
455 super().flush()
456 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000457 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000458 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000459 self.assertEqual(record, [1, 2, 3])
460
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000461 def test_IOBase_destructor(self):
462 self._check_base_destructor(self.IOBase)
463
464 def test_RawIOBase_destructor(self):
465 self._check_base_destructor(self.RawIOBase)
466
467 def test_BufferedIOBase_destructor(self):
468 self._check_base_destructor(self.BufferedIOBase)
469
470 def test_TextIOBase_destructor(self):
471 self._check_base_destructor(self.TextIOBase)
472
Guido van Rossum87429772007-04-10 21:06:59 +0000473 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000474 with self.open(support.TESTFN, "wb") as f:
475 f.write(b"xxx")
476 with self.open(support.TESTFN, "rb") as f:
477 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000478
Guido van Rossumd4103952007-04-12 05:44:49 +0000479 def test_array_writes(self):
480 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000481 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000482 with self.open(support.TESTFN, "wb", 0) as f:
483 self.assertEqual(f.write(a), n)
484 with self.open(support.TESTFN, "wb") as f:
485 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000486
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000487 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000489 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000490
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000491 def test_read_closed(self):
492 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000493 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000494 with self.open(support.TESTFN, "r") as f:
495 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000496 self.assertEqual(file.read(), "egg\n")
497 file.seek(0)
498 file.close()
499 self.assertRaises(ValueError, file.read)
500
501 def test_no_closefd_with_filename(self):
502 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000504
505 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000507 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000508 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000509 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000510 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000511 self.assertEqual(file.buffer.raw.closefd, False)
512
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000513 def test_garbage_collection(self):
514 # FileIO objects are collected, and collecting them flushes
515 # all data to disk.
516 f = self.FileIO(support.TESTFN, "wb")
517 f.write(b"abcxxx")
518 f.f = f
519 wr = weakref.ref(f)
520 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000521 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000522 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000523 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000524 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000525
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000526 def test_unbounded_file(self):
527 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
528 zero = "/dev/zero"
529 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000530 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000531 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000532 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000533 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000534 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000536 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000537 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000538 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000539 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000540 self.assertRaises(OverflowError, f.read)
541
Antoine Pitrou6be88762010-05-03 16:48:20 +0000542 def test_flush_error_on_close(self):
543 f = self.open(support.TESTFN, "wb", buffering=0)
544 def bad_flush():
545 raise IOError()
546 f.flush = bad_flush
547 self.assertRaises(IOError, f.close) # exception not swallowed
548
549 def test_multi_close(self):
550 f = self.open(support.TESTFN, "wb", buffering=0)
551 f.close()
552 f.close()
553 f.close()
554 self.assertRaises(ValueError, f.flush)
555
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000556class CIOTest(IOTest):
557 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559class PyIOTest(IOTest):
560 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000561
Guido van Rossuma9e20242007-03-08 00:43:48 +0000562
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000563class CommonBufferedTests:
564 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
565
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000566 def test_detach(self):
567 raw = self.MockRawIO()
568 buf = self.tp(raw)
569 self.assertIs(buf.detach(), raw)
570 self.assertRaises(ValueError, buf.detach)
571
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000572 def test_fileno(self):
573 rawio = self.MockRawIO()
574 bufio = self.tp(rawio)
575
576 self.assertEquals(42, bufio.fileno())
577
578 def test_no_fileno(self):
579 # XXX will we always have fileno() function? If so, kill
580 # this test. Else, write it.
581 pass
582
583 def test_invalid_args(self):
584 rawio = self.MockRawIO()
585 bufio = self.tp(rawio)
586 # Invalid whence
587 self.assertRaises(ValueError, bufio.seek, 0, -1)
588 self.assertRaises(ValueError, bufio.seek, 0, 3)
589
590 def test_override_destructor(self):
591 tp = self.tp
592 record = []
593 class MyBufferedIO(tp):
594 def __del__(self):
595 record.append(1)
596 try:
597 f = super().__del__
598 except AttributeError:
599 pass
600 else:
601 f()
602 def close(self):
603 record.append(2)
604 super().close()
605 def flush(self):
606 record.append(3)
607 super().flush()
608 rawio = self.MockRawIO()
609 bufio = MyBufferedIO(rawio)
610 writable = bufio.writable()
611 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000612 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613 if writable:
614 self.assertEqual(record, [1, 2, 3])
615 else:
616 self.assertEqual(record, [1, 2])
617
618 def test_context_manager(self):
619 # Test usability as a context manager
620 rawio = self.MockRawIO()
621 bufio = self.tp(rawio)
622 def _with():
623 with bufio:
624 pass
625 _with()
626 # bufio should now be closed, and using it a second time should raise
627 # a ValueError.
628 self.assertRaises(ValueError, _with)
629
630 def test_error_through_destructor(self):
631 # Test that the exception state is not modified by a destructor,
632 # even if close() fails.
633 rawio = self.CloseFailureIO()
634 def f():
635 self.tp(rawio).xyzzy
636 with support.captured_output("stderr") as s:
637 self.assertRaises(AttributeError, f)
638 s = s.getvalue().strip()
639 if s:
640 # The destructor *may* have printed an unraisable error, check it
641 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000642 self.assertTrue(s.startswith("Exception IOError: "), s)
643 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000644
Antoine Pitrou716c4442009-05-23 19:04:03 +0000645 def test_repr(self):
646 raw = self.MockRawIO()
647 b = self.tp(raw)
648 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
649 self.assertEqual(repr(b), "<%s>" % clsname)
650 raw.name = "dummy"
651 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
652 raw.name = b"dummy"
653 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
654
Antoine Pitrou6be88762010-05-03 16:48:20 +0000655 def test_flush_error_on_close(self):
656 raw = self.MockRawIO()
657 def bad_flush():
658 raise IOError()
659 raw.flush = bad_flush
660 b = self.tp(raw)
661 self.assertRaises(IOError, b.close) # exception not swallowed
662
663 def test_multi_close(self):
664 raw = self.MockRawIO()
665 b = self.tp(raw)
666 b.close()
667 b.close()
668 b.close()
669 self.assertRaises(ValueError, b.flush)
670
Guido van Rossum78892e42007-04-06 17:31:18 +0000671
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000672class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
673 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675 def test_constructor(self):
676 rawio = self.MockRawIO([b"abc"])
677 bufio = self.tp(rawio)
678 bufio.__init__(rawio)
679 bufio.__init__(rawio, buffer_size=1024)
680 bufio.__init__(rawio, buffer_size=16)
681 self.assertEquals(b"abc", bufio.read())
682 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
683 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
684 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
685 rawio = self.MockRawIO([b"abc"])
686 bufio.__init__(rawio)
687 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000689 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000690 for arg in (None, 7):
691 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
692 bufio = self.tp(rawio)
693 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 # Invalid args
695 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_read1(self):
698 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
699 bufio = self.tp(rawio)
700 self.assertEquals(b"a", bufio.read(1))
701 self.assertEquals(b"b", bufio.read1(1))
702 self.assertEquals(rawio._reads, 1)
703 self.assertEquals(b"c", bufio.read1(100))
704 self.assertEquals(rawio._reads, 1)
705 self.assertEquals(b"d", bufio.read1(100))
706 self.assertEquals(rawio._reads, 2)
707 self.assertEquals(b"efg", bufio.read1(100))
708 self.assertEquals(rawio._reads, 3)
709 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000710 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 # Invalid args
712 self.assertRaises(ValueError, bufio.read1, -1)
713
714 def test_readinto(self):
715 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
716 bufio = self.tp(rawio)
717 b = bytearray(2)
718 self.assertEquals(bufio.readinto(b), 2)
719 self.assertEquals(b, b"ab")
720 self.assertEquals(bufio.readinto(b), 2)
721 self.assertEquals(b, b"cd")
722 self.assertEquals(bufio.readinto(b), 2)
723 self.assertEquals(b, b"ef")
724 self.assertEquals(bufio.readinto(b), 1)
725 self.assertEquals(b, b"gf")
726 self.assertEquals(bufio.readinto(b), 0)
727 self.assertEquals(b, b"gf")
728
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000729 def test_readlines(self):
730 def bufio():
731 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
732 return self.tp(rawio)
733 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
734 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
735 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000738 data = b"abcdefghi"
739 dlen = len(data)
740
741 tests = [
742 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
743 [ 100, [ 3, 3, 3], [ dlen ] ],
744 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
745 ]
746
747 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 rawio = self.MockFileIO(data)
749 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000750 pos = 0
751 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000752 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000753 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000754 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000755 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000757 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000758 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
760 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000761
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000762 self.assertEquals(b"abcd", bufio.read(6))
763 self.assertEquals(b"e", bufio.read(1))
764 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000765 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000766 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000767 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 def test_read_past_eof(self):
770 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
771 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000772
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000773 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000775 def test_read_all(self):
776 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
777 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000778
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000779 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000780
Victor Stinner45df8202010-04-28 22:31:17 +0000781 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000782 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000783 try:
784 # Write out many bytes with exactly the same number of 0's,
785 # 1's... 255's. This will help us check that concurrent reading
786 # doesn't duplicate or forget contents.
787 N = 1000
788 l = list(range(256)) * N
789 random.shuffle(l)
790 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000791 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000792 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000793 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000794 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000795 errors = []
796 results = []
797 def f():
798 try:
799 # Intra-buffer read then buffer-flushing read
800 for n in cycle([1, 19]):
801 s = bufio.read(n)
802 if not s:
803 break
804 # list.append() is atomic
805 results.append(s)
806 except Exception as e:
807 errors.append(e)
808 raise
809 threads = [threading.Thread(target=f) for x in range(20)]
810 for t in threads:
811 t.start()
812 time.sleep(0.02) # yield
813 for t in threads:
814 t.join()
815 self.assertFalse(errors,
816 "the following exceptions were caught: %r" % errors)
817 s = b''.join(results)
818 for i in range(256):
819 c = bytes(bytearray([i]))
820 self.assertEqual(s.count(c), N)
821 finally:
822 support.unlink(support.TESTFN)
823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000824 def test_misbehaved_io(self):
825 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
826 bufio = self.tp(rawio)
827 self.assertRaises(IOError, bufio.seek, 0)
828 self.assertRaises(IOError, bufio.tell)
829
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000830 def test_no_extraneous_read(self):
831 # Issue #9550; when the raw IO object has satisfied the read request,
832 # we should not issue any additional reads, otherwise it may block
833 # (e.g. socket).
834 bufsize = 16
835 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
836 rawio = self.MockRawIO([b"x" * n])
837 bufio = self.tp(rawio, bufsize)
838 self.assertEqual(bufio.read(n), b"x" * n)
839 # Simple case: one raw read is enough to satisfy the request.
840 self.assertEqual(rawio._extraneous_reads, 0,
841 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
842 # A more complex case where two raw reads are needed to satisfy
843 # the request.
844 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
845 bufio = self.tp(rawio, bufsize)
846 self.assertEqual(bufio.read(n), b"x" * n)
847 self.assertEqual(rawio._extraneous_reads, 0,
848 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
849
850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851class CBufferedReaderTest(BufferedReaderTest):
852 tp = io.BufferedReader
853
854 def test_constructor(self):
855 BufferedReaderTest.test_constructor(self)
856 # The allocation can succeed on 32-bit builds, e.g. with more
857 # than 2GB RAM and a 64-bit kernel.
858 if sys.maxsize > 0x7FFFFFFF:
859 rawio = self.MockRawIO()
860 bufio = self.tp(rawio)
861 self.assertRaises((OverflowError, MemoryError, ValueError),
862 bufio.__init__, rawio, sys.maxsize)
863
864 def test_initialization(self):
865 rawio = self.MockRawIO([b"abc"])
866 bufio = self.tp(rawio)
867 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
868 self.assertRaises(ValueError, bufio.read)
869 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
870 self.assertRaises(ValueError, bufio.read)
871 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
872 self.assertRaises(ValueError, bufio.read)
873
874 def test_misbehaved_io_read(self):
875 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
876 bufio = self.tp(rawio)
877 # _pyio.BufferedReader seems to implement reading different, so that
878 # checking this is not so easy.
879 self.assertRaises(IOError, bufio.read, 10)
880
881 def test_garbage_collection(self):
882 # C BufferedReader objects are collected.
883 # The Python version has __del__, so it ends into gc.garbage instead
884 rawio = self.FileIO(support.TESTFN, "w+b")
885 f = self.tp(rawio)
886 f.f = f
887 wr = weakref.ref(f)
888 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000889 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000890 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000891
892class PyBufferedReaderTest(BufferedReaderTest):
893 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000894
Guido van Rossuma9e20242007-03-08 00:43:48 +0000895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000896class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
897 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 def test_constructor(self):
900 rawio = self.MockRawIO()
901 bufio = self.tp(rawio)
902 bufio.__init__(rawio)
903 bufio.__init__(rawio, buffer_size=1024)
904 bufio.__init__(rawio, buffer_size=16)
905 self.assertEquals(3, bufio.write(b"abc"))
906 bufio.flush()
907 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
908 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
909 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
910 bufio.__init__(rawio)
911 self.assertEquals(3, bufio.write(b"ghi"))
912 bufio.flush()
913 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
914
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000915 def test_detach_flush(self):
916 raw = self.MockRawIO()
917 buf = self.tp(raw)
918 buf.write(b"howdy!")
919 self.assertFalse(raw._write_stack)
920 buf.detach()
921 self.assertEqual(raw._write_stack, [b"howdy!"])
922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000924 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 writer = self.MockRawIO()
926 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000927 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000928 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 def test_write_overflow(self):
931 writer = self.MockRawIO()
932 bufio = self.tp(writer, 8)
933 contents = b"abcdefghijklmnop"
934 for n in range(0, len(contents), 3):
935 bufio.write(contents[n:n+3])
936 flushed = b"".join(writer._write_stack)
937 # At least (total - 8) bytes were implicitly flushed, perhaps more
938 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000939 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941 def check_writes(self, intermediate_func):
942 # Lots of writes, test the flushed output is as expected.
943 contents = bytes(range(256)) * 1000
944 n = 0
945 writer = self.MockRawIO()
946 bufio = self.tp(writer, 13)
947 # Generator of write sizes: repeat each N 15 times then proceed to N+1
948 def gen_sizes():
949 for size in count(1):
950 for i in range(15):
951 yield size
952 sizes = gen_sizes()
953 while n < len(contents):
954 size = min(next(sizes), len(contents) - n)
955 self.assertEquals(bufio.write(contents[n:n+size]), size)
956 intermediate_func(bufio)
957 n += size
958 bufio.flush()
959 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000961 def test_writes(self):
962 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000963
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000964 def test_writes_and_flushes(self):
965 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 def test_writes_and_seeks(self):
968 def _seekabs(bufio):
969 pos = bufio.tell()
970 bufio.seek(pos + 1, 0)
971 bufio.seek(pos - 1, 0)
972 bufio.seek(pos, 0)
973 self.check_writes(_seekabs)
974 def _seekrel(bufio):
975 pos = bufio.seek(0, 1)
976 bufio.seek(+1, 1)
977 bufio.seek(-1, 1)
978 bufio.seek(pos, 0)
979 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981 def test_writes_and_truncates(self):
982 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 def test_write_non_blocking(self):
985 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000986 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000987
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000988 self.assertEquals(bufio.write(b"abcd"), 4)
989 self.assertEquals(bufio.write(b"efghi"), 5)
990 # 1 byte will be written, the rest will be buffered
991 raw.block_on(b"k")
992 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000993
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000994 # 8 bytes will be written, 8 will be buffered and the rest will be lost
995 raw.block_on(b"0")
996 try:
997 bufio.write(b"opqrwxyz0123456789")
998 except self.BlockingIOError as e:
999 written = e.characters_written
1000 else:
1001 self.fail("BlockingIOError should have been raised")
1002 self.assertEquals(written, 16)
1003 self.assertEquals(raw.pop_written(),
1004 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001006 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
1007 s = raw.pop_written()
1008 # Previously buffered bytes were flushed
1009 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001010
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 def test_write_and_rewind(self):
1012 raw = io.BytesIO()
1013 bufio = self.tp(raw, 4)
1014 self.assertEqual(bufio.write(b"abcdef"), 6)
1015 self.assertEqual(bufio.tell(), 6)
1016 bufio.seek(0, 0)
1017 self.assertEqual(bufio.write(b"XY"), 2)
1018 bufio.seek(6, 0)
1019 self.assertEqual(raw.getvalue(), b"XYcdef")
1020 self.assertEqual(bufio.write(b"123456"), 6)
1021 bufio.flush()
1022 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024 def test_flush(self):
1025 writer = self.MockRawIO()
1026 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001027 bufio.write(b"abc")
1028 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001029 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001031 def test_destructor(self):
1032 writer = self.MockRawIO()
1033 bufio = self.tp(writer, 8)
1034 bufio.write(b"abc")
1035 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001036 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001037 self.assertEquals(b"abc", writer._write_stack[0])
1038
1039 def test_truncate(self):
1040 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001041 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 bufio = self.tp(raw, 8)
1043 bufio.write(b"abcdef")
1044 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001045 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001046 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 self.assertEqual(f.read(), b"abc")
1048
Victor Stinner45df8202010-04-28 22:31:17 +00001049 @unittest.skipUnless(threading, 'Threading required for this test.')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001050 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001051 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 # Write out many bytes from many threads and test they were
1053 # all flushed.
1054 N = 1000
1055 contents = bytes(range(256)) * N
1056 sizes = cycle([1, 19])
1057 n = 0
1058 queue = deque()
1059 while n < len(contents):
1060 size = next(sizes)
1061 queue.append(contents[n:n+size])
1062 n += size
1063 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001064 # We use a real file object because it allows us to
1065 # exercise situations where the GIL is released before
1066 # writing the buffer to the raw streams. This is in addition
1067 # to concurrency issues due to switching threads in the middle
1068 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001069 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001071 errors = []
1072 def f():
1073 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 while True:
1075 try:
1076 s = queue.popleft()
1077 except IndexError:
1078 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001079 bufio.write(s)
1080 except Exception as e:
1081 errors.append(e)
1082 raise
1083 threads = [threading.Thread(target=f) for x in range(20)]
1084 for t in threads:
1085 t.start()
1086 time.sleep(0.02) # yield
1087 for t in threads:
1088 t.join()
1089 self.assertFalse(errors,
1090 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001091 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001092 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 s = f.read()
1094 for i in range(256):
1095 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001096 finally:
1097 support.unlink(support.TESTFN)
1098
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 def test_misbehaved_io(self):
1100 rawio = self.MisbehavedRawIO()
1101 bufio = self.tp(rawio, 5)
1102 self.assertRaises(IOError, bufio.seek, 0)
1103 self.assertRaises(IOError, bufio.tell)
1104 self.assertRaises(IOError, bufio.write, b"abcdef")
1105
Benjamin Peterson59406a92009-03-26 17:10:29 +00001106 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001107 with support.check_warnings(("max_buffer_size is deprecated",
1108 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001109 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001110
1111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001112class CBufferedWriterTest(BufferedWriterTest):
1113 tp = io.BufferedWriter
1114
1115 def test_constructor(self):
1116 BufferedWriterTest.test_constructor(self)
1117 # The allocation can succeed on 32-bit builds, e.g. with more
1118 # than 2GB RAM and a 64-bit kernel.
1119 if sys.maxsize > 0x7FFFFFFF:
1120 rawio = self.MockRawIO()
1121 bufio = self.tp(rawio)
1122 self.assertRaises((OverflowError, MemoryError, ValueError),
1123 bufio.__init__, rawio, sys.maxsize)
1124
1125 def test_initialization(self):
1126 rawio = self.MockRawIO()
1127 bufio = self.tp(rawio)
1128 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1129 self.assertRaises(ValueError, bufio.write, b"def")
1130 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1131 self.assertRaises(ValueError, bufio.write, b"def")
1132 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1133 self.assertRaises(ValueError, bufio.write, b"def")
1134
1135 def test_garbage_collection(self):
1136 # C BufferedWriter objects are collected, and collecting them flushes
1137 # all data to disk.
1138 # The Python version has __del__, so it ends into gc.garbage instead
1139 rawio = self.FileIO(support.TESTFN, "w+b")
1140 f = self.tp(rawio)
1141 f.write(b"123xxx")
1142 f.x = f
1143 wr = weakref.ref(f)
1144 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001145 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001146 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001147 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001148 self.assertEqual(f.read(), b"123xxx")
1149
1150
1151class PyBufferedWriterTest(BufferedWriterTest):
1152 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001153
Guido van Rossum01a27522007-03-07 01:00:12 +00001154class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001155
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001156 def test_constructor(self):
1157 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001158 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001159
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001160 def test_detach(self):
1161 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1162 self.assertRaises(self.UnsupportedOperation, pair.detach)
1163
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001164 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001165 with support.check_warnings(("max_buffer_size is deprecated",
1166 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001167 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001168
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001169 def test_constructor_with_not_readable(self):
1170 class NotReadable(MockRawIO):
1171 def readable(self):
1172 return False
1173
1174 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1175
1176 def test_constructor_with_not_writeable(self):
1177 class NotWriteable(MockRawIO):
1178 def writable(self):
1179 return False
1180
1181 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1182
1183 def test_read(self):
1184 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1185
1186 self.assertEqual(pair.read(3), b"abc")
1187 self.assertEqual(pair.read(1), b"d")
1188 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001189 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1190 self.assertEqual(pair.read(None), b"abc")
1191
1192 def test_readlines(self):
1193 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1194 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1195 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1196 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001197
1198 def test_read1(self):
1199 # .read1() is delegated to the underlying reader object, so this test
1200 # can be shallow.
1201 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1202
1203 self.assertEqual(pair.read1(3), b"abc")
1204
1205 def test_readinto(self):
1206 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1207
1208 data = bytearray(5)
1209 self.assertEqual(pair.readinto(data), 5)
1210 self.assertEqual(data, b"abcde")
1211
1212 def test_write(self):
1213 w = self.MockRawIO()
1214 pair = self.tp(self.MockRawIO(), w)
1215
1216 pair.write(b"abc")
1217 pair.flush()
1218 pair.write(b"def")
1219 pair.flush()
1220 self.assertEqual(w._write_stack, [b"abc", b"def"])
1221
1222 def test_peek(self):
1223 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1224
1225 self.assertTrue(pair.peek(3).startswith(b"abc"))
1226 self.assertEqual(pair.read(3), b"abc")
1227
1228 def test_readable(self):
1229 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1230 self.assertTrue(pair.readable())
1231
1232 def test_writeable(self):
1233 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1234 self.assertTrue(pair.writable())
1235
1236 def test_seekable(self):
1237 # BufferedRWPairs are never seekable, even if their readers and writers
1238 # are.
1239 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1240 self.assertFalse(pair.seekable())
1241
1242 # .flush() is delegated to the underlying writer object and has been
1243 # tested in the test_write method.
1244
1245 def test_close_and_closed(self):
1246 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1247 self.assertFalse(pair.closed)
1248 pair.close()
1249 self.assertTrue(pair.closed)
1250
1251 def test_isatty(self):
1252 class SelectableIsAtty(MockRawIO):
1253 def __init__(self, isatty):
1254 MockRawIO.__init__(self)
1255 self._isatty = isatty
1256
1257 def isatty(self):
1258 return self._isatty
1259
1260 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1261 self.assertFalse(pair.isatty())
1262
1263 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1264 self.assertTrue(pair.isatty())
1265
1266 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1267 self.assertTrue(pair.isatty())
1268
1269 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1270 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001272class CBufferedRWPairTest(BufferedRWPairTest):
1273 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001274
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275class PyBufferedRWPairTest(BufferedRWPairTest):
1276 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001277
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278
1279class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1280 read_mode = "rb+"
1281 write_mode = "wb+"
1282
1283 def test_constructor(self):
1284 BufferedReaderTest.test_constructor(self)
1285 BufferedWriterTest.test_constructor(self)
1286
1287 def test_read_and_write(self):
1288 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001289 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001290
1291 self.assertEqual(b"as", rw.read(2))
1292 rw.write(b"ddd")
1293 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001294 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001295 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001296 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001297
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001298 def test_seek_and_tell(self):
1299 raw = self.BytesIO(b"asdfghjkl")
1300 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001301
1302 self.assertEquals(b"as", rw.read(2))
1303 self.assertEquals(2, rw.tell())
1304 rw.seek(0, 0)
1305 self.assertEquals(b"asdf", rw.read(4))
1306
1307 rw.write(b"asdf")
1308 rw.seek(0, 0)
1309 self.assertEquals(b"asdfasdfl", rw.read())
1310 self.assertEquals(9, rw.tell())
1311 rw.seek(-4, 2)
1312 self.assertEquals(5, rw.tell())
1313 rw.seek(2, 1)
1314 self.assertEquals(7, rw.tell())
1315 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001316 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001317
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318 def check_flush_and_read(self, read_func):
1319 raw = self.BytesIO(b"abcdefghi")
1320 bufio = self.tp(raw)
1321
1322 self.assertEquals(b"ab", read_func(bufio, 2))
1323 bufio.write(b"12")
1324 self.assertEquals(b"ef", read_func(bufio, 2))
1325 self.assertEquals(6, bufio.tell())
1326 bufio.flush()
1327 self.assertEquals(6, bufio.tell())
1328 self.assertEquals(b"ghi", read_func(bufio))
1329 raw.seek(0, 0)
1330 raw.write(b"XYZ")
1331 # flush() resets the read buffer
1332 bufio.flush()
1333 bufio.seek(0, 0)
1334 self.assertEquals(b"XYZ", read_func(bufio, 3))
1335
1336 def test_flush_and_read(self):
1337 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1338
1339 def test_flush_and_readinto(self):
1340 def _readinto(bufio, n=-1):
1341 b = bytearray(n if n >= 0 else 9999)
1342 n = bufio.readinto(b)
1343 return bytes(b[:n])
1344 self.check_flush_and_read(_readinto)
1345
1346 def test_flush_and_peek(self):
1347 def _peek(bufio, n=-1):
1348 # This relies on the fact that the buffer can contain the whole
1349 # raw stream, otherwise peek() can return less.
1350 b = bufio.peek(n)
1351 if n != -1:
1352 b = b[:n]
1353 bufio.seek(len(b), 1)
1354 return b
1355 self.check_flush_and_read(_peek)
1356
1357 def test_flush_and_write(self):
1358 raw = self.BytesIO(b"abcdefghi")
1359 bufio = self.tp(raw)
1360
1361 bufio.write(b"123")
1362 bufio.flush()
1363 bufio.write(b"45")
1364 bufio.flush()
1365 bufio.seek(0, 0)
1366 self.assertEquals(b"12345fghi", raw.getvalue())
1367 self.assertEquals(b"12345fghi", bufio.read())
1368
1369 def test_threads(self):
1370 BufferedReaderTest.test_threads(self)
1371 BufferedWriterTest.test_threads(self)
1372
1373 def test_writes_and_peek(self):
1374 def _peek(bufio):
1375 bufio.peek(1)
1376 self.check_writes(_peek)
1377 def _peek(bufio):
1378 pos = bufio.tell()
1379 bufio.seek(-1, 1)
1380 bufio.peek(1)
1381 bufio.seek(pos, 0)
1382 self.check_writes(_peek)
1383
1384 def test_writes_and_reads(self):
1385 def _read(bufio):
1386 bufio.seek(-1, 1)
1387 bufio.read(1)
1388 self.check_writes(_read)
1389
1390 def test_writes_and_read1s(self):
1391 def _read1(bufio):
1392 bufio.seek(-1, 1)
1393 bufio.read1(1)
1394 self.check_writes(_read1)
1395
1396 def test_writes_and_readintos(self):
1397 def _read(bufio):
1398 bufio.seek(-1, 1)
1399 bufio.readinto(bytearray(1))
1400 self.check_writes(_read)
1401
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001402 def test_write_after_readahead(self):
1403 # Issue #6629: writing after the buffer was filled by readahead should
1404 # first rewind the raw stream.
1405 for overwrite_size in [1, 5]:
1406 raw = self.BytesIO(b"A" * 10)
1407 bufio = self.tp(raw, 4)
1408 # Trigger readahead
1409 self.assertEqual(bufio.read(1), b"A")
1410 self.assertEqual(bufio.tell(), 1)
1411 # Overwriting should rewind the raw stream if it needs so
1412 bufio.write(b"B" * overwrite_size)
1413 self.assertEqual(bufio.tell(), overwrite_size + 1)
1414 # If the write size was smaller than the buffer size, flush() and
1415 # check that rewind happens.
1416 bufio.flush()
1417 self.assertEqual(bufio.tell(), overwrite_size + 1)
1418 s = raw.getvalue()
1419 self.assertEqual(s,
1420 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1421
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001422 def test_truncate_after_read_or_write(self):
1423 raw = self.BytesIO(b"A" * 10)
1424 bufio = self.tp(raw, 100)
1425 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1426 self.assertEqual(bufio.truncate(), 2)
1427 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1428 self.assertEqual(bufio.truncate(), 4)
1429
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 def test_misbehaved_io(self):
1431 BufferedReaderTest.test_misbehaved_io(self)
1432 BufferedWriterTest.test_misbehaved_io(self)
1433
1434class CBufferedRandomTest(BufferedRandomTest):
1435 tp = io.BufferedRandom
1436
1437 def test_constructor(self):
1438 BufferedRandomTest.test_constructor(self)
1439 # The allocation can succeed on 32-bit builds, e.g. with more
1440 # than 2GB RAM and a 64-bit kernel.
1441 if sys.maxsize > 0x7FFFFFFF:
1442 rawio = self.MockRawIO()
1443 bufio = self.tp(rawio)
1444 self.assertRaises((OverflowError, MemoryError, ValueError),
1445 bufio.__init__, rawio, sys.maxsize)
1446
1447 def test_garbage_collection(self):
1448 CBufferedReaderTest.test_garbage_collection(self)
1449 CBufferedWriterTest.test_garbage_collection(self)
1450
1451class PyBufferedRandomTest(BufferedRandomTest):
1452 tp = pyio.BufferedRandom
1453
1454
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001455# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1456# properties:
1457# - A single output character can correspond to many bytes of input.
1458# - The number of input bytes to complete the character can be
1459# undetermined until the last input byte is received.
1460# - The number of input bytes can vary depending on previous input.
1461# - A single input byte can correspond to many characters of output.
1462# - The number of output characters can be undetermined until the
1463# last input byte is received.
1464# - The number of output characters can vary depending on previous input.
1465
1466class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1467 """
1468 For testing seek/tell behavior with a stateful, buffering decoder.
1469
1470 Input is a sequence of words. Words may be fixed-length (length set
1471 by input) or variable-length (period-terminated). In variable-length
1472 mode, extra periods are ignored. Possible words are:
1473 - 'i' followed by a number sets the input length, I (maximum 99).
1474 When I is set to 0, words are space-terminated.
1475 - 'o' followed by a number sets the output length, O (maximum 99).
1476 - Any other word is converted into a word followed by a period on
1477 the output. The output word consists of the input word truncated
1478 or padded out with hyphens to make its length equal to O. If O
1479 is 0, the word is output verbatim without truncating or padding.
1480 I and O are initially set to 1. When I changes, any buffered input is
1481 re-scanned according to the new I. EOF also terminates the last word.
1482 """
1483
1484 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001485 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001486 self.reset()
1487
1488 def __repr__(self):
1489 return '<SID %x>' % id(self)
1490
1491 def reset(self):
1492 self.i = 1
1493 self.o = 1
1494 self.buffer = bytearray()
1495
1496 def getstate(self):
1497 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1498 return bytes(self.buffer), i*100 + o
1499
1500 def setstate(self, state):
1501 buffer, io = state
1502 self.buffer = bytearray(buffer)
1503 i, o = divmod(io, 100)
1504 self.i, self.o = i ^ 1, o ^ 1
1505
1506 def decode(self, input, final=False):
1507 output = ''
1508 for b in input:
1509 if self.i == 0: # variable-length, terminated with period
1510 if b == ord('.'):
1511 if self.buffer:
1512 output += self.process_word()
1513 else:
1514 self.buffer.append(b)
1515 else: # fixed-length, terminate after self.i bytes
1516 self.buffer.append(b)
1517 if len(self.buffer) == self.i:
1518 output += self.process_word()
1519 if final and self.buffer: # EOF terminates the last word
1520 output += self.process_word()
1521 return output
1522
1523 def process_word(self):
1524 output = ''
1525 if self.buffer[0] == ord('i'):
1526 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1527 elif self.buffer[0] == ord('o'):
1528 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1529 else:
1530 output = self.buffer.decode('ascii')
1531 if len(output) < self.o:
1532 output += '-'*self.o # pad out with hyphens
1533 if self.o:
1534 output = output[:self.o] # truncate to output length
1535 output += '.'
1536 self.buffer = bytearray()
1537 return output
1538
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001539 codecEnabled = False
1540
1541 @classmethod
1542 def lookupTestDecoder(cls, name):
1543 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001544 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001545 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001546 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001547 incrementalencoder=None,
1548 streamreader=None, streamwriter=None,
1549 incrementaldecoder=cls)
1550
1551# Register the previous decoder for testing.
1552# Disabled by default, tests will enable it.
1553codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1554
1555
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001556class StatefulIncrementalDecoderTest(unittest.TestCase):
1557 """
1558 Make sure the StatefulIncrementalDecoder actually works.
1559 """
1560
1561 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001562 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001563 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001564 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001565 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001566 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001567 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001568 # I=0, O=6 (variable-length input, fixed-length output)
1569 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1570 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001571 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001572 # I=6, O=3 (fixed-length input > fixed-length output)
1573 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1574 # I=0, then 3; O=29, then 15 (with longer output)
1575 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1576 'a----------------------------.' +
1577 'b----------------------------.' +
1578 'cde--------------------------.' +
1579 'abcdefghijabcde.' +
1580 'a.b------------.' +
1581 '.c.------------.' +
1582 'd.e------------.' +
1583 'k--------------.' +
1584 'l--------------.' +
1585 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001586 ]
1587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001589 # Try a few one-shot test cases.
1590 for input, eof, output in self.test_cases:
1591 d = StatefulIncrementalDecoder()
1592 self.assertEquals(d.decode(input, eof), output)
1593
1594 # Also test an unfinished decode, followed by forcing EOF.
1595 d = StatefulIncrementalDecoder()
1596 self.assertEquals(d.decode(b'oiabcd'), '')
1597 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001598
1599class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001600
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001601 def setUp(self):
1602 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1603 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001604 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001605
Guido van Rossumd0712812007-04-11 16:32:43 +00001606 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001607 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def test_constructor(self):
1610 r = self.BytesIO(b"\xc3\xa9\n\n")
1611 b = self.BufferedReader(r, 1000)
1612 t = self.TextIOWrapper(b)
1613 t.__init__(b, encoding="latin1", newline="\r\n")
1614 self.assertEquals(t.encoding, "latin1")
1615 self.assertEquals(t.line_buffering, False)
1616 t.__init__(b, encoding="utf8", line_buffering=True)
1617 self.assertEquals(t.encoding, "utf8")
1618 self.assertEquals(t.line_buffering, True)
1619 self.assertEquals("\xe9\n", t.readline())
1620 self.assertRaises(TypeError, t.__init__, b, newline=42)
1621 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1622
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001623 def test_detach(self):
1624 r = self.BytesIO()
1625 b = self.BufferedWriter(r)
1626 t = self.TextIOWrapper(b)
1627 self.assertIs(t.detach(), b)
1628
1629 t = self.TextIOWrapper(b, encoding="ascii")
1630 t.write("howdy")
1631 self.assertFalse(r.getvalue())
1632 t.detach()
1633 self.assertEqual(r.getvalue(), b"howdy")
1634 self.assertRaises(ValueError, t.detach)
1635
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001636 def test_repr(self):
1637 raw = self.BytesIO("hello".encode("utf-8"))
1638 b = self.BufferedReader(raw)
1639 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001640 modname = self.TextIOWrapper.__module__
1641 self.assertEqual(repr(t),
1642 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1643 raw.name = "dummy"
1644 self.assertEqual(repr(t),
1645 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1646 raw.name = b"dummy"
1647 self.assertEqual(repr(t),
1648 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001649
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650 def test_line_buffering(self):
1651 r = self.BytesIO()
1652 b = self.BufferedWriter(r, 1000)
1653 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001654 t.write("X")
1655 self.assertEquals(r.getvalue(), b"") # No flush happened
1656 t.write("Y\nZ")
1657 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1658 t.write("A\rB")
1659 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 def test_encoding(self):
1662 # Check the encoding attribute is always set, and valid
1663 b = self.BytesIO()
1664 t = self.TextIOWrapper(b, encoding="utf8")
1665 self.assertEqual(t.encoding, "utf8")
1666 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001667 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 codecs.lookup(t.encoding)
1669
1670 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001671 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001672 b = self.BytesIO(b"abc\n\xff\n")
1673 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001674 self.assertRaises(UnicodeError, t.read)
1675 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001676 b = self.BytesIO(b"abc\n\xff\n")
1677 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001678 self.assertRaises(UnicodeError, t.read)
1679 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001680 b = self.BytesIO(b"abc\n\xff\n")
1681 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001682 self.assertEquals(t.read(), "abc\n\n")
1683 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001684 b = self.BytesIO(b"abc\n\xff\n")
1685 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001686 self.assertEquals(t.read(), "abc\n\ufffd\n")
1687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001689 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001690 b = self.BytesIO()
1691 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001692 self.assertRaises(UnicodeError, t.write, "\xff")
1693 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 b = self.BytesIO()
1695 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001696 self.assertRaises(UnicodeError, t.write, "\xff")
1697 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001698 b = self.BytesIO()
1699 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001700 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001701 t.write("abc\xffdef\n")
1702 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001703 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001704 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 b = self.BytesIO()
1706 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001707 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001708 t.write("abc\xffdef\n")
1709 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001710 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001713 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1714
1715 tests = [
1716 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001717 [ '', input_lines ],
1718 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1719 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1720 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001721 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001722 encodings = (
1723 'utf-8', 'latin-1',
1724 'utf-16', 'utf-16-le', 'utf-16-be',
1725 'utf-32', 'utf-32-le', 'utf-32-be',
1726 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001727
Guido van Rossum8358db22007-08-18 21:39:55 +00001728 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001729 # character in TextIOWrapper._pending_line.
1730 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001731 # XXX: str.encode() should return bytes
1732 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001733 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001734 for bufsize in range(1, 10):
1735 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001736 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1737 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001738 encoding=encoding)
1739 if do_reads:
1740 got_lines = []
1741 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001742 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001743 if c2 == '':
1744 break
1745 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001746 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001747 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001748 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001749
1750 for got_line, exp_line in zip(got_lines, exp_lines):
1751 self.assertEquals(got_line, exp_line)
1752 self.assertEquals(len(got_lines), len(exp_lines))
1753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 def test_newlines_input(self):
1755 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001756 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1757 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001758 (None, normalized.decode("ascii").splitlines(True)),
1759 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1761 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1762 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001763 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 buf = self.BytesIO(testdata)
1765 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001766 self.assertEquals(txt.readlines(), expected)
1767 txt.seek(0)
1768 self.assertEquals(txt.read(), "".join(expected))
1769
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 def test_newlines_output(self):
1771 testdict = {
1772 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1773 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1774 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1775 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1776 }
1777 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1778 for newline, expected in tests:
1779 buf = self.BytesIO()
1780 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1781 txt.write("AAA\nB")
1782 txt.write("BB\nCCC\n")
1783 txt.write("X\rY\r\nZ")
1784 txt.flush()
1785 self.assertEquals(buf.closed, False)
1786 self.assertEquals(buf.getvalue(), expected)
1787
1788 def test_destructor(self):
1789 l = []
1790 base = self.BytesIO
1791 class MyBytesIO(base):
1792 def close(self):
1793 l.append(self.getvalue())
1794 base.close(self)
1795 b = MyBytesIO()
1796 t = self.TextIOWrapper(b, encoding="ascii")
1797 t.write("abc")
1798 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001799 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 self.assertEquals([b"abc"], l)
1801
1802 def test_override_destructor(self):
1803 record = []
1804 class MyTextIO(self.TextIOWrapper):
1805 def __del__(self):
1806 record.append(1)
1807 try:
1808 f = super().__del__
1809 except AttributeError:
1810 pass
1811 else:
1812 f()
1813 def close(self):
1814 record.append(2)
1815 super().close()
1816 def flush(self):
1817 record.append(3)
1818 super().flush()
1819 b = self.BytesIO()
1820 t = MyTextIO(b, encoding="ascii")
1821 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001822 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001823 self.assertEqual(record, [1, 2, 3])
1824
1825 def test_error_through_destructor(self):
1826 # Test that the exception state is not modified by a destructor,
1827 # even if close() fails.
1828 rawio = self.CloseFailureIO()
1829 def f():
1830 self.TextIOWrapper(rawio).xyzzy
1831 with support.captured_output("stderr") as s:
1832 self.assertRaises(AttributeError, f)
1833 s = s.getvalue().strip()
1834 if s:
1835 # The destructor *may* have printed an unraisable error, check it
1836 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001837 self.assertTrue(s.startswith("Exception IOError: "), s)
1838 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001839
Guido van Rossum9b76da62007-04-11 01:09:03 +00001840 # Systematic tests of the text I/O API
1841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001843 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1844 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001846 f._CHUNK_SIZE = chunksize
1847 self.assertEquals(f.write("abc"), 3)
1848 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001850 f._CHUNK_SIZE = chunksize
1851 self.assertEquals(f.tell(), 0)
1852 self.assertEquals(f.read(), "abc")
1853 cookie = f.tell()
1854 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001855 self.assertEquals(f.read(None), "abc")
1856 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001857 self.assertEquals(f.read(2), "ab")
1858 self.assertEquals(f.read(1), "c")
1859 self.assertEquals(f.read(1), "")
1860 self.assertEquals(f.read(), "")
1861 self.assertEquals(f.tell(), cookie)
1862 self.assertEquals(f.seek(0), 0)
1863 self.assertEquals(f.seek(0, 2), cookie)
1864 self.assertEquals(f.write("def"), 3)
1865 self.assertEquals(f.seek(cookie), cookie)
1866 self.assertEquals(f.read(), "def")
1867 if enc.startswith("utf"):
1868 self.multi_line_test(f, enc)
1869 f.close()
1870
1871 def multi_line_test(self, f, enc):
1872 f.seek(0)
1873 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001874 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001875 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001876 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001877 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001878 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001879 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001880 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001881 wlines.append((f.tell(), line))
1882 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001883 f.seek(0)
1884 rlines = []
1885 while True:
1886 pos = f.tell()
1887 line = f.readline()
1888 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001889 break
1890 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001891 self.assertEquals(rlines, wlines)
1892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 def test_telling(self):
1894 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001895 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001896 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001897 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001898 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001899 p2 = f.tell()
1900 f.seek(0)
1901 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001902 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001903 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001904 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001905 self.assertEquals(f.tell(), p2)
1906 f.seek(0)
1907 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001908 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001909 self.assertRaises(IOError, f.tell)
1910 self.assertEquals(f.tell(), p2)
1911 f.close()
1912
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 def test_seeking(self):
1914 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001915 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001916 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001917 prefix = bytes(u_prefix.encode("utf-8"))
1918 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001919 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001920 suffix = bytes(u_suffix.encode("utf-8"))
1921 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001922 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001923 f.write(line*2)
1924 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001925 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001926 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001927 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001928 self.assertEquals(f.tell(), prefix_size)
1929 self.assertEquals(f.readline(), u_suffix)
1930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001931 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001932 # Regression test for a specific bug
1933 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001935 f.write(data)
1936 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001938 f._CHUNK_SIZE # Just test that it exists
1939 f._CHUNK_SIZE = 2
1940 f.readline()
1941 f.tell()
1942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 def test_seek_and_tell(self):
1944 #Test seek/tell using the StatefulIncrementalDecoder.
1945 # Make test faster by doing smaller seeks
1946 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001947
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001948 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001949 """Tell/seek to various points within a data stream and ensure
1950 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001952 f.write(data)
1953 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001954 f = self.open(support.TESTFN, encoding='test_decoder')
1955 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001956 decoded = f.read()
1957 f.close()
1958
Neal Norwitze2b07052008-03-18 19:52:05 +00001959 for i in range(min_pos, len(decoded) + 1): # seek positions
1960 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001961 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001962 self.assertEquals(f.read(i), decoded[:i])
1963 cookie = f.tell()
1964 self.assertEquals(f.read(j), decoded[i:i + j])
1965 f.seek(cookie)
1966 self.assertEquals(f.read(), decoded[i:])
1967 f.close()
1968
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001969 # Enable the test decoder.
1970 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001971
1972 # Run the tests.
1973 try:
1974 # Try each test case.
1975 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001976 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001977
1978 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001979 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1980 offset = CHUNK_SIZE - len(input)//2
1981 prefix = b'.'*offset
1982 # Don't bother seeking into the prefix (takes too long).
1983 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001984 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001985
1986 # Ensure our test decoder won't interfere with subsequent tests.
1987 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001988 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001990 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001991 data = "1234567890"
1992 tests = ("utf-16",
1993 "utf-16-le",
1994 "utf-16-be",
1995 "utf-32",
1996 "utf-32-le",
1997 "utf-32-be")
1998 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 buf = self.BytesIO()
2000 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002001 # Check if the BOM is written only once (see issue1753).
2002 f.write(data)
2003 f.write(data)
2004 f.seek(0)
2005 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00002006 f.seek(0)
2007 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00002008 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
2009
Benjamin Petersona1b49012009-03-31 23:11:32 +00002010 def test_unreadable(self):
2011 class UnReadable(self.BytesIO):
2012 def readable(self):
2013 return False
2014 txt = self.TextIOWrapper(UnReadable())
2015 self.assertRaises(IOError, txt.read)
2016
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002017 def test_read_one_by_one(self):
2018 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002019 reads = ""
2020 while True:
2021 c = txt.read(1)
2022 if not c:
2023 break
2024 reads += c
2025 self.assertEquals(reads, "AA\nBB")
2026
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002027 def test_readlines(self):
2028 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2029 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2030 txt.seek(0)
2031 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2032 txt.seek(0)
2033 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2034
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002035 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002037 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002038 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002039 reads = ""
2040 while True:
2041 c = txt.read(128)
2042 if not c:
2043 break
2044 reads += c
2045 self.assertEquals(reads, "A"*127+"\nB")
2046
2047 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002048 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002049
2050 # read one char at a time
2051 reads = ""
2052 while True:
2053 c = txt.read(1)
2054 if not c:
2055 break
2056 reads += c
2057 self.assertEquals(reads, self.normalized)
2058
2059 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002060 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002061 txt._CHUNK_SIZE = 4
2062
2063 reads = ""
2064 while True:
2065 c = txt.read(4)
2066 if not c:
2067 break
2068 reads += c
2069 self.assertEquals(reads, self.normalized)
2070
2071 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002072 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002073 txt._CHUNK_SIZE = 4
2074
2075 reads = txt.read(4)
2076 reads += txt.read(4)
2077 reads += txt.readline()
2078 reads += txt.readline()
2079 reads += txt.readline()
2080 self.assertEquals(reads, self.normalized)
2081
2082 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002083 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002084 txt._CHUNK_SIZE = 4
2085
2086 reads = txt.read(4)
2087 reads += txt.read()
2088 self.assertEquals(reads, self.normalized)
2089
2090 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002091 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002092 txt._CHUNK_SIZE = 4
2093
2094 reads = txt.read(4)
2095 pos = txt.tell()
2096 txt.seek(0)
2097 txt.seek(pos)
2098 self.assertEquals(txt.read(4), "BBB\n")
2099
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002100 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002101 buffer = self.BytesIO(self.testdata)
2102 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002103
2104 self.assertEqual(buffer.seekable(), txt.seekable())
2105
Antoine Pitroue4501852009-05-14 18:55:55 +00002106 def test_append_bom(self):
2107 # The BOM is not written again when appending to a non-empty file
2108 filename = support.TESTFN
2109 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2110 with self.open(filename, 'w', encoding=charset) as f:
2111 f.write('aaa')
2112 pos = f.tell()
2113 with self.open(filename, 'rb') as f:
2114 self.assertEquals(f.read(), 'aaa'.encode(charset))
2115
2116 with self.open(filename, 'a', encoding=charset) as f:
2117 f.write('xxx')
2118 with self.open(filename, 'rb') as f:
2119 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2120
2121 def test_seek_bom(self):
2122 # Same test, but when seeking manually
2123 filename = support.TESTFN
2124 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2125 with self.open(filename, 'w', encoding=charset) as f:
2126 f.write('aaa')
2127 pos = f.tell()
2128 with self.open(filename, 'r+', encoding=charset) as f:
2129 f.seek(pos)
2130 f.write('zzz')
2131 f.seek(0)
2132 f.write('bbb')
2133 with self.open(filename, 'rb') as f:
2134 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2135
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002136 def test_errors_property(self):
2137 with self.open(support.TESTFN, "w") as f:
2138 self.assertEqual(f.errors, "strict")
2139 with self.open(support.TESTFN, "w", errors="replace") as f:
2140 self.assertEqual(f.errors, "replace")
2141
Victor Stinner45df8202010-04-28 22:31:17 +00002142 @unittest.skipUnless(threading, 'Threading required for this test.')
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002143 def test_threads_write(self):
2144 # Issue6750: concurrent writes could duplicate data
2145 event = threading.Event()
2146 with self.open(support.TESTFN, "w", buffering=1) as f:
2147 def run(n):
2148 text = "Thread%03d\n" % n
2149 event.wait()
2150 f.write(text)
2151 threads = [threading.Thread(target=lambda n=x: run(n))
2152 for x in range(20)]
2153 for t in threads:
2154 t.start()
2155 time.sleep(0.02)
2156 event.set()
2157 for t in threads:
2158 t.join()
2159 with self.open(support.TESTFN) as f:
2160 content = f.read()
2161 for n in range(20):
2162 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2163
Antoine Pitrou6be88762010-05-03 16:48:20 +00002164 def test_flush_error_on_close(self):
2165 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2166 def bad_flush():
2167 raise IOError()
2168 txt.flush = bad_flush
2169 self.assertRaises(IOError, txt.close) # exception not swallowed
2170
2171 def test_multi_close(self):
2172 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2173 txt.close()
2174 txt.close()
2175 txt.close()
2176 self.assertRaises(ValueError, txt.flush)
2177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178class CTextIOWrapperTest(TextIOWrapperTest):
2179
2180 def test_initialization(self):
2181 r = self.BytesIO(b"\xc3\xa9\n\n")
2182 b = self.BufferedReader(r, 1000)
2183 t = self.TextIOWrapper(b)
2184 self.assertRaises(TypeError, t.__init__, b, newline=42)
2185 self.assertRaises(ValueError, t.read)
2186 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2187 self.assertRaises(ValueError, t.read)
2188
2189 def test_garbage_collection(self):
2190 # C TextIOWrapper objects are collected, and collecting them flushes
2191 # all data to disk.
2192 # The Python version has __del__, so it ends in gc.garbage instead.
2193 rawio = io.FileIO(support.TESTFN, "wb")
2194 b = self.BufferedWriter(rawio)
2195 t = self.TextIOWrapper(b, encoding="ascii")
2196 t.write("456def")
2197 t.x = t
2198 wr = weakref.ref(t)
2199 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002200 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002201 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002202 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203 self.assertEqual(f.read(), b"456def")
2204
2205class PyTextIOWrapperTest(TextIOWrapperTest):
2206 pass
2207
2208
2209class IncrementalNewlineDecoderTest(unittest.TestCase):
2210
2211 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002212 # UTF-8 specific tests for a newline decoder
2213 def _check_decode(b, s, **kwargs):
2214 # We exercise getstate() / setstate() as well as decode()
2215 state = decoder.getstate()
2216 self.assertEquals(decoder.decode(b, **kwargs), s)
2217 decoder.setstate(state)
2218 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002219
Antoine Pitrou180a3362008-12-14 16:36:46 +00002220 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002221
Antoine Pitrou180a3362008-12-14 16:36:46 +00002222 _check_decode(b'\xe8', "")
2223 _check_decode(b'\xa2', "")
2224 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002225
Antoine Pitrou180a3362008-12-14 16:36:46 +00002226 _check_decode(b'\xe8', "")
2227 _check_decode(b'\xa2', "")
2228 _check_decode(b'\x88', "\u8888")
2229
2230 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002231 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2232
Antoine Pitrou180a3362008-12-14 16:36:46 +00002233 decoder.reset()
2234 _check_decode(b'\n', "\n")
2235 _check_decode(b'\r', "")
2236 _check_decode(b'', "\n", final=True)
2237 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002238
Antoine Pitrou180a3362008-12-14 16:36:46 +00002239 _check_decode(b'\r', "")
2240 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002241
Antoine Pitrou180a3362008-12-14 16:36:46 +00002242 _check_decode(b'\r\r\n', "\n\n")
2243 _check_decode(b'\r', "")
2244 _check_decode(b'\r', "\n")
2245 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002246
Antoine Pitrou180a3362008-12-14 16:36:46 +00002247 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2248 _check_decode(b'\xe8\xa2\x88', "\u8888")
2249 _check_decode(b'\n', "\n")
2250 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2251 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002253 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002254 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002255 if encoding is not None:
2256 encoder = codecs.getincrementalencoder(encoding)()
2257 def _decode_bytewise(s):
2258 # Decode one byte at a time
2259 for b in encoder.encode(s):
2260 result.append(decoder.decode(bytes([b])))
2261 else:
2262 encoder = None
2263 def _decode_bytewise(s):
2264 # Decode one char at a time
2265 for c in s:
2266 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002267 self.assertEquals(decoder.newlines, None)
2268 _decode_bytewise("abc\n\r")
2269 self.assertEquals(decoder.newlines, '\n')
2270 _decode_bytewise("\nabc")
2271 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2272 _decode_bytewise("abc\r")
2273 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2274 _decode_bytewise("abc")
2275 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2276 _decode_bytewise("abc\r")
2277 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2278 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 input = "abc"
2280 if encoder is not None:
2281 encoder.reset()
2282 input = encoder.encode(input)
2283 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002284 self.assertEquals(decoder.newlines, None)
2285
2286 def test_newline_decoder(self):
2287 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002288 # None meaning the IncrementalNewlineDecoder takes unicode input
2289 # rather than bytes input
2290 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002291 'utf-16', 'utf-16-le', 'utf-16-be',
2292 'utf-32', 'utf-32-le', 'utf-32-be',
2293 )
2294 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002295 decoder = enc and codecs.getincrementaldecoder(enc)()
2296 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2297 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002298 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2300 self.check_newline_decoding_utf8(decoder)
2301
Antoine Pitrou66913e22009-03-06 23:40:56 +00002302 def test_newline_bytes(self):
2303 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2304 def _check(dec):
2305 self.assertEquals(dec.newlines, None)
2306 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2307 self.assertEquals(dec.newlines, None)
2308 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2309 self.assertEquals(dec.newlines, None)
2310 dec = self.IncrementalNewlineDecoder(None, translate=False)
2311 _check(dec)
2312 dec = self.IncrementalNewlineDecoder(None, translate=True)
2313 _check(dec)
2314
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002315class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2316 pass
2317
2318class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2319 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002320
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002321
Guido van Rossum01a27522007-03-07 01:00:12 +00002322# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002323
Guido van Rossum5abbf752007-08-27 17:39:33 +00002324class MiscIOTest(unittest.TestCase):
2325
Barry Warsaw40e82462008-11-20 20:14:50 +00002326 def tearDown(self):
2327 support.unlink(support.TESTFN)
2328
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002329 def test___all__(self):
2330 for name in self.io.__all__:
2331 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002332 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002333 if name == "open":
2334 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002335 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002336 self.assertTrue(issubclass(obj, Exception), name)
2337 elif not name.startswith("SEEK_"):
2338 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002339
Barry Warsaw40e82462008-11-20 20:14:50 +00002340 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002341 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002342 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002343 f.close()
2344
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002346 self.assertEquals(f.name, support.TESTFN)
2347 self.assertEquals(f.buffer.name, support.TESTFN)
2348 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2349 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002350 self.assertEquals(f.buffer.mode, "rb")
2351 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002352 f.close()
2353
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002354 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002355 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002356 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2357 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002359 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002360 self.assertEquals(g.mode, "wb")
2361 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002362 self.assertEquals(g.name, f.fileno())
2363 self.assertEquals(g.raw.name, f.fileno())
2364 f.close()
2365 g.close()
2366
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002367 def test_io_after_close(self):
2368 for kwargs in [
2369 {"mode": "w"},
2370 {"mode": "wb"},
2371 {"mode": "w", "buffering": 1},
2372 {"mode": "w", "buffering": 2},
2373 {"mode": "wb", "buffering": 0},
2374 {"mode": "r"},
2375 {"mode": "rb"},
2376 {"mode": "r", "buffering": 1},
2377 {"mode": "r", "buffering": 2},
2378 {"mode": "rb", "buffering": 0},
2379 {"mode": "w+"},
2380 {"mode": "w+b"},
2381 {"mode": "w+", "buffering": 1},
2382 {"mode": "w+", "buffering": 2},
2383 {"mode": "w+b", "buffering": 0},
2384 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002385 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002386 f.close()
2387 self.assertRaises(ValueError, f.flush)
2388 self.assertRaises(ValueError, f.fileno)
2389 self.assertRaises(ValueError, f.isatty)
2390 self.assertRaises(ValueError, f.__iter__)
2391 if hasattr(f, "peek"):
2392 self.assertRaises(ValueError, f.peek, 1)
2393 self.assertRaises(ValueError, f.read)
2394 if hasattr(f, "read1"):
2395 self.assertRaises(ValueError, f.read1, 1024)
2396 if hasattr(f, "readinto"):
2397 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2398 self.assertRaises(ValueError, f.readline)
2399 self.assertRaises(ValueError, f.readlines)
2400 self.assertRaises(ValueError, f.seek, 0)
2401 self.assertRaises(ValueError, f.tell)
2402 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002403 self.assertRaises(ValueError, f.write,
2404 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002405 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002407
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408 def test_blockingioerror(self):
2409 # Various BlockingIOError issues
2410 self.assertRaises(TypeError, self.BlockingIOError)
2411 self.assertRaises(TypeError, self.BlockingIOError, 1)
2412 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2413 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2414 b = self.BlockingIOError(1, "")
2415 self.assertEqual(b.characters_written, 0)
2416 class C(str):
2417 pass
2418 c = C("")
2419 b = self.BlockingIOError(1, c)
2420 c.b = b
2421 b.c = c
2422 wr = weakref.ref(c)
2423 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002424 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002425 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002426
2427 def test_abcs(self):
2428 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002429 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2430 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2431 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2432 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002433
2434 def _check_abc_inheritance(self, abcmodule):
2435 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002436 self.assertIsInstance(f, abcmodule.IOBase)
2437 self.assertIsInstance(f, abcmodule.RawIOBase)
2438 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2439 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002440 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002441 self.assertIsInstance(f, abcmodule.IOBase)
2442 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2443 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2444 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002445 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002446 self.assertIsInstance(f, abcmodule.IOBase)
2447 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2448 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2449 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450
2451 def test_abc_inheritance(self):
2452 # Test implementations inherit from their respective ABCs
2453 self._check_abc_inheritance(self)
2454
2455 def test_abc_inheritance_official(self):
2456 # Test implementations inherit from the official ABCs of the
2457 # baseline "io" module.
2458 self._check_abc_inheritance(io)
2459
2460class CMiscIOTest(MiscIOTest):
2461 io = io
2462
2463class PyMiscIOTest(MiscIOTest):
2464 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002465
Guido van Rossum28524c72007-02-27 05:47:44 +00002466def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002467 tests = (CIOTest, PyIOTest,
2468 CBufferedReaderTest, PyBufferedReaderTest,
2469 CBufferedWriterTest, PyBufferedWriterTest,
2470 CBufferedRWPairTest, PyBufferedRWPairTest,
2471 CBufferedRandomTest, PyBufferedRandomTest,
2472 StatefulIncrementalDecoderTest,
2473 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2474 CTextIOWrapperTest, PyTextIOWrapperTest,
2475 CMiscIOTest, PyMiscIOTest,)
2476
2477 # Put the namespaces of the IO module we are testing and some useful mock
2478 # classes in the __dict__ of each test.
2479 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2480 MockNonBlockWriterIO)
2481 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2482 c_io_ns = {name : getattr(io, name) for name in all_members}
2483 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2484 globs = globals()
2485 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2486 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2487 # Avoid turning open into a bound method.
2488 py_io_ns["open"] = pyio.OpenWrapper
2489 for test in tests:
2490 if test.__name__.startswith("C"):
2491 for name, obj in c_io_ns.items():
2492 setattr(test, name, obj)
2493 elif test.__name__.startswith("Py"):
2494 for name, obj in py_io_ns.items():
2495 setattr(test, name, obj)
2496
2497 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002498
2499if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002500 test_main()