blob: 2f76fed7fc2e401f70c4a0380c344f8ef65335c7 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000293 def test_invalid_operations(self):
294 # Try writing on a file opened in read mode and vice-versa.
295 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000296 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000297 self.assertRaises(IOError, fp.read)
298 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000299 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000300 self.assertRaises(IOError, fp.write, b"blah")
301 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000302 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000303 self.assertRaises(IOError, fp.write, "blah")
304 self.assertRaises(IOError, fp.writelines, ["blah\n"])
305
Guido van Rossum28524c72007-02-27 05:47:44 +0000306 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000307 with self.open(support.TESTFN, "wb", buffering=0) as f:
308 self.assertEqual(f.readable(), False)
309 self.assertEqual(f.writable(), True)
310 self.assertEqual(f.seekable(), True)
311 self.write_ops(f)
312 with self.open(support.TESTFN, "rb", buffering=0) as f:
313 self.assertEqual(f.readable(), True)
314 self.assertEqual(f.writable(), False)
315 self.assertEqual(f.seekable(), True)
316 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000317
Guido van Rossum87429772007-04-10 21:06:59 +0000318 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000319 with self.open(support.TESTFN, "wb") as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb") as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000329
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000331 with self.open(support.TESTFN, "wb") as f:
332 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
333 with self.open(support.TESTFN, "rb") as f:
334 self.assertEqual(f.readline(), b"abc\n")
335 self.assertEqual(f.readline(10), b"def\n")
336 self.assertEqual(f.readline(2), b"xy")
337 self.assertEqual(f.readline(4), b"zzy\n")
338 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000339 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000340 self.assertRaises(TypeError, f.readline, 5.3)
341 with self.open(support.TESTFN, "r") as f:
342 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000343
Guido van Rossum28524c72007-02-27 05:47:44 +0000344 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000345 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000346 self.write_ops(f)
347 data = f.getvalue()
348 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000349 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum53807da2007-04-10 19:01:47 +0000352 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 # On Windows and Mac OSX this test comsumes large resources; It takes
354 # a long time to build the >2GB file and takes >2GB of disk space
355 # therefore the resource must be enabled to run this test.
356 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000357 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000358 print("\nTesting large file ops skipped on %s." % sys.platform,
359 file=sys.stderr)
360 print("It requires %d bytes and a long time." % self.LARGE,
361 file=sys.stderr)
362 print("Use 'regrtest.py -u largefile test_io' to run it.",
363 file=sys.stderr)
364 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000365 with self.open(support.TESTFN, "w+b", 0) as f:
366 self.large_file_ops(f)
367 with self.open(support.TESTFN, "w+b") as f:
368 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000369
370 def test_with_open(self):
371 for bufsize in (0, 1, 100):
372 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000373 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000374 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.closed, True)
376 f = None
377 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000379 1/0
380 except ZeroDivisionError:
381 self.assertEqual(f.closed, True)
382 else:
383 self.fail("1/0 didn't raise an exception")
384
Antoine Pitrou08838b62009-01-21 00:55:13 +0000385 # issue 5008
386 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000388 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000389 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000390 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000394 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395
Guido van Rossum87429772007-04-10 21:06:59 +0000396 def test_destructor(self):
397 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000399 def __del__(self):
400 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 try:
402 f = super().__del__
403 except AttributeError:
404 pass
405 else:
406 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000407 def close(self):
408 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000410 def flush(self):
411 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 super().flush()
413 f = MyFileIO(support.TESTFN, "wb")
414 f.write(b"xxx")
415 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000416 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000418 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000419 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420
421 def _check_base_destructor(self, base):
422 record = []
423 class MyIO(base):
424 def __init__(self):
425 # This exercises the availability of attributes on object
426 # destruction.
427 # (in the C version, close() is called by the tp_dealloc
428 # function, not by __del__)
429 self.on_del = 1
430 self.on_close = 2
431 self.on_flush = 3
432 def __del__(self):
433 record.append(self.on_del)
434 try:
435 f = super().__del__
436 except AttributeError:
437 pass
438 else:
439 f()
440 def close(self):
441 record.append(self.on_close)
442 super().close()
443 def flush(self):
444 record.append(self.on_flush)
445 super().flush()
446 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000447 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000448 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000449 self.assertEqual(record, [1, 2, 3])
450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 def test_IOBase_destructor(self):
452 self._check_base_destructor(self.IOBase)
453
454 def test_RawIOBase_destructor(self):
455 self._check_base_destructor(self.RawIOBase)
456
457 def test_BufferedIOBase_destructor(self):
458 self._check_base_destructor(self.BufferedIOBase)
459
460 def test_TextIOBase_destructor(self):
461 self._check_base_destructor(self.TextIOBase)
462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000464 with self.open(support.TESTFN, "wb") as f:
465 f.write(b"xxx")
466 with self.open(support.TESTFN, "rb") as f:
467 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000468
Guido van Rossumd4103952007-04-12 05:44:49 +0000469 def test_array_writes(self):
470 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000471 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000472 with self.open(support.TESTFN, "wb", 0) as f:
473 self.assertEqual(f.write(a), n)
474 with self.open(support.TESTFN, "wb") as f:
475 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000476
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000477 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000479 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481 def test_read_closed(self):
482 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000483 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 with self.open(support.TESTFN, "r") as f:
485 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(file.read(), "egg\n")
487 file.seek(0)
488 file.close()
489 self.assertRaises(ValueError, file.read)
490
491 def test_no_closefd_with_filename(self):
492 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000494
495 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000496 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000497 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000499 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000501 self.assertEqual(file.buffer.raw.closefd, False)
502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_garbage_collection(self):
504 # FileIO objects are collected, and collecting them flushes
505 # all data to disk.
506 f = self.FileIO(support.TESTFN, "wb")
507 f.write(b"abcxxx")
508 f.f = f
509 wr = weakref.ref(f)
510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000512 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000513 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000515
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000516 def test_unbounded_file(self):
517 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
518 zero = "/dev/zero"
519 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000520 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000521 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000522 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000523 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000524 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000525 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000526 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000527 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000529 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 self.assertRaises(OverflowError, f.read)
531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532class CIOTest(IOTest):
533 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535class PyIOTest(IOTest):
536 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000537
Guido van Rossuma9e20242007-03-08 00:43:48 +0000538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539class CommonBufferedTests:
540 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
541
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000542 def test_detach(self):
543 raw = self.MockRawIO()
544 buf = self.tp(raw)
545 self.assertIs(buf.detach(), raw)
546 self.assertRaises(ValueError, buf.detach)
547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_fileno(self):
549 rawio = self.MockRawIO()
550 bufio = self.tp(rawio)
551
552 self.assertEquals(42, bufio.fileno())
553
554 def test_no_fileno(self):
555 # XXX will we always have fileno() function? If so, kill
556 # this test. Else, write it.
557 pass
558
559 def test_invalid_args(self):
560 rawio = self.MockRawIO()
561 bufio = self.tp(rawio)
562 # Invalid whence
563 self.assertRaises(ValueError, bufio.seek, 0, -1)
564 self.assertRaises(ValueError, bufio.seek, 0, 3)
565
566 def test_override_destructor(self):
567 tp = self.tp
568 record = []
569 class MyBufferedIO(tp):
570 def __del__(self):
571 record.append(1)
572 try:
573 f = super().__del__
574 except AttributeError:
575 pass
576 else:
577 f()
578 def close(self):
579 record.append(2)
580 super().close()
581 def flush(self):
582 record.append(3)
583 super().flush()
584 rawio = self.MockRawIO()
585 bufio = MyBufferedIO(rawio)
586 writable = bufio.writable()
587 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000588 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 if writable:
590 self.assertEqual(record, [1, 2, 3])
591 else:
592 self.assertEqual(record, [1, 2])
593
594 def test_context_manager(self):
595 # Test usability as a context manager
596 rawio = self.MockRawIO()
597 bufio = self.tp(rawio)
598 def _with():
599 with bufio:
600 pass
601 _with()
602 # bufio should now be closed, and using it a second time should raise
603 # a ValueError.
604 self.assertRaises(ValueError, _with)
605
606 def test_error_through_destructor(self):
607 # Test that the exception state is not modified by a destructor,
608 # even if close() fails.
609 rawio = self.CloseFailureIO()
610 def f():
611 self.tp(rawio).xyzzy
612 with support.captured_output("stderr") as s:
613 self.assertRaises(AttributeError, f)
614 s = s.getvalue().strip()
615 if s:
616 # The destructor *may* have printed an unraisable error, check it
617 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000618 self.assertTrue(s.startswith("Exception IOError: "), s)
619 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000620
Antoine Pitrou716c4442009-05-23 19:04:03 +0000621 def test_repr(self):
622 raw = self.MockRawIO()
623 b = self.tp(raw)
624 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
625 self.assertEqual(repr(b), "<%s>" % clsname)
626 raw.name = "dummy"
627 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
628 raw.name = b"dummy"
629 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
630
Guido van Rossum78892e42007-04-06 17:31:18 +0000631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000632class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
633 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 def test_constructor(self):
636 rawio = self.MockRawIO([b"abc"])
637 bufio = self.tp(rawio)
638 bufio.__init__(rawio)
639 bufio.__init__(rawio, buffer_size=1024)
640 bufio.__init__(rawio, buffer_size=16)
641 self.assertEquals(b"abc", bufio.read())
642 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
643 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
644 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
645 rawio = self.MockRawIO([b"abc"])
646 bufio.__init__(rawio)
647 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000650 for arg in (None, 7):
651 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
652 bufio = self.tp(rawio)
653 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 # Invalid args
655 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000656
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 def test_read1(self):
658 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
659 bufio = self.tp(rawio)
660 self.assertEquals(b"a", bufio.read(1))
661 self.assertEquals(b"b", bufio.read1(1))
662 self.assertEquals(rawio._reads, 1)
663 self.assertEquals(b"c", bufio.read1(100))
664 self.assertEquals(rawio._reads, 1)
665 self.assertEquals(b"d", bufio.read1(100))
666 self.assertEquals(rawio._reads, 2)
667 self.assertEquals(b"efg", bufio.read1(100))
668 self.assertEquals(rawio._reads, 3)
669 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000670 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671 # Invalid args
672 self.assertRaises(ValueError, bufio.read1, -1)
673
674 def test_readinto(self):
675 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
676 bufio = self.tp(rawio)
677 b = bytearray(2)
678 self.assertEquals(bufio.readinto(b), 2)
679 self.assertEquals(b, b"ab")
680 self.assertEquals(bufio.readinto(b), 2)
681 self.assertEquals(b, b"cd")
682 self.assertEquals(bufio.readinto(b), 2)
683 self.assertEquals(b, b"ef")
684 self.assertEquals(bufio.readinto(b), 1)
685 self.assertEquals(b, b"gf")
686 self.assertEquals(bufio.readinto(b), 0)
687 self.assertEquals(b, b"gf")
688
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000689 def test_readlines(self):
690 def bufio():
691 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
692 return self.tp(rawio)
693 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
694 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
695 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000698 data = b"abcdefghi"
699 dlen = len(data)
700
701 tests = [
702 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
703 [ 100, [ 3, 3, 3], [ dlen ] ],
704 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
705 ]
706
707 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000708 rawio = self.MockFileIO(data)
709 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000710 pos = 0
711 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000712 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000713 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000715 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000718 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
720 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000721
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000722 self.assertEquals(b"abcd", bufio.read(6))
723 self.assertEquals(b"e", bufio.read(1))
724 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000726 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000727 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read_past_eof(self):
730 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
731 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000732
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000733 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 def test_read_all(self):
736 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
737 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000738
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000739 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000742 try:
743 # Write out many bytes with exactly the same number of 0's,
744 # 1's... 255's. This will help us check that concurrent reading
745 # doesn't duplicate or forget contents.
746 N = 1000
747 l = list(range(256)) * N
748 random.shuffle(l)
749 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000750 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000751 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000752 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000754 errors = []
755 results = []
756 def f():
757 try:
758 # Intra-buffer read then buffer-flushing read
759 for n in cycle([1, 19]):
760 s = bufio.read(n)
761 if not s:
762 break
763 # list.append() is atomic
764 results.append(s)
765 except Exception as e:
766 errors.append(e)
767 raise
768 threads = [threading.Thread(target=f) for x in range(20)]
769 for t in threads:
770 t.start()
771 time.sleep(0.02) # yield
772 for t in threads:
773 t.join()
774 self.assertFalse(errors,
775 "the following exceptions were caught: %r" % errors)
776 s = b''.join(results)
777 for i in range(256):
778 c = bytes(bytearray([i]))
779 self.assertEqual(s.count(c), N)
780 finally:
781 support.unlink(support.TESTFN)
782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000783 def test_misbehaved_io(self):
784 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
785 bufio = self.tp(rawio)
786 self.assertRaises(IOError, bufio.seek, 0)
787 self.assertRaises(IOError, bufio.tell)
788
789class CBufferedReaderTest(BufferedReaderTest):
790 tp = io.BufferedReader
791
792 def test_constructor(self):
793 BufferedReaderTest.test_constructor(self)
794 # The allocation can succeed on 32-bit builds, e.g. with more
795 # than 2GB RAM and a 64-bit kernel.
796 if sys.maxsize > 0x7FFFFFFF:
797 rawio = self.MockRawIO()
798 bufio = self.tp(rawio)
799 self.assertRaises((OverflowError, MemoryError, ValueError),
800 bufio.__init__, rawio, sys.maxsize)
801
802 def test_initialization(self):
803 rawio = self.MockRawIO([b"abc"])
804 bufio = self.tp(rawio)
805 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
806 self.assertRaises(ValueError, bufio.read)
807 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
808 self.assertRaises(ValueError, bufio.read)
809 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
810 self.assertRaises(ValueError, bufio.read)
811
812 def test_misbehaved_io_read(self):
813 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
814 bufio = self.tp(rawio)
815 # _pyio.BufferedReader seems to implement reading different, so that
816 # checking this is not so easy.
817 self.assertRaises(IOError, bufio.read, 10)
818
819 def test_garbage_collection(self):
820 # C BufferedReader objects are collected.
821 # The Python version has __del__, so it ends into gc.garbage instead
822 rawio = self.FileIO(support.TESTFN, "w+b")
823 f = self.tp(rawio)
824 f.f = f
825 wr = weakref.ref(f)
826 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000827 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000828 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000829
830class PyBufferedReaderTest(BufferedReaderTest):
831 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000832
Guido van Rossuma9e20242007-03-08 00:43:48 +0000833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000834class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
835 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000837 def test_constructor(self):
838 rawio = self.MockRawIO()
839 bufio = self.tp(rawio)
840 bufio.__init__(rawio)
841 bufio.__init__(rawio, buffer_size=1024)
842 bufio.__init__(rawio, buffer_size=16)
843 self.assertEquals(3, bufio.write(b"abc"))
844 bufio.flush()
845 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
846 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
847 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
848 bufio.__init__(rawio)
849 self.assertEquals(3, bufio.write(b"ghi"))
850 bufio.flush()
851 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
852
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000853 def test_detach_flush(self):
854 raw = self.MockRawIO()
855 buf = self.tp(raw)
856 buf.write(b"howdy!")
857 self.assertFalse(raw._write_stack)
858 buf.detach()
859 self.assertEqual(raw._write_stack, [b"howdy!"])
860
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000861 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000862 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000863 writer = self.MockRawIO()
864 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000865 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000866 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000868 def test_write_overflow(self):
869 writer = self.MockRawIO()
870 bufio = self.tp(writer, 8)
871 contents = b"abcdefghijklmnop"
872 for n in range(0, len(contents), 3):
873 bufio.write(contents[n:n+3])
874 flushed = b"".join(writer._write_stack)
875 # At least (total - 8) bytes were implicitly flushed, perhaps more
876 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000877 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 def check_writes(self, intermediate_func):
880 # Lots of writes, test the flushed output is as expected.
881 contents = bytes(range(256)) * 1000
882 n = 0
883 writer = self.MockRawIO()
884 bufio = self.tp(writer, 13)
885 # Generator of write sizes: repeat each N 15 times then proceed to N+1
886 def gen_sizes():
887 for size in count(1):
888 for i in range(15):
889 yield size
890 sizes = gen_sizes()
891 while n < len(contents):
892 size = min(next(sizes), len(contents) - n)
893 self.assertEquals(bufio.write(contents[n:n+size]), size)
894 intermediate_func(bufio)
895 n += size
896 bufio.flush()
897 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 def test_writes(self):
900 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000902 def test_writes_and_flushes(self):
903 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000904
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000905 def test_writes_and_seeks(self):
906 def _seekabs(bufio):
907 pos = bufio.tell()
908 bufio.seek(pos + 1, 0)
909 bufio.seek(pos - 1, 0)
910 bufio.seek(pos, 0)
911 self.check_writes(_seekabs)
912 def _seekrel(bufio):
913 pos = bufio.seek(0, 1)
914 bufio.seek(+1, 1)
915 bufio.seek(-1, 1)
916 bufio.seek(pos, 0)
917 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 def test_writes_and_truncates(self):
920 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 def test_write_non_blocking(self):
923 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000924 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926 self.assertEquals(bufio.write(b"abcd"), 4)
927 self.assertEquals(bufio.write(b"efghi"), 5)
928 # 1 byte will be written, the rest will be buffered
929 raw.block_on(b"k")
930 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932 # 8 bytes will be written, 8 will be buffered and the rest will be lost
933 raw.block_on(b"0")
934 try:
935 bufio.write(b"opqrwxyz0123456789")
936 except self.BlockingIOError as e:
937 written = e.characters_written
938 else:
939 self.fail("BlockingIOError should have been raised")
940 self.assertEquals(written, 16)
941 self.assertEquals(raw.pop_written(),
942 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000943
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000944 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
945 s = raw.pop_written()
946 # Previously buffered bytes were flushed
947 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_write_and_rewind(self):
950 raw = io.BytesIO()
951 bufio = self.tp(raw, 4)
952 self.assertEqual(bufio.write(b"abcdef"), 6)
953 self.assertEqual(bufio.tell(), 6)
954 bufio.seek(0, 0)
955 self.assertEqual(bufio.write(b"XY"), 2)
956 bufio.seek(6, 0)
957 self.assertEqual(raw.getvalue(), b"XYcdef")
958 self.assertEqual(bufio.write(b"123456"), 6)
959 bufio.flush()
960 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 def test_flush(self):
963 writer = self.MockRawIO()
964 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000965 bufio.write(b"abc")
966 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000967 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 def test_destructor(self):
970 writer = self.MockRawIO()
971 bufio = self.tp(writer, 8)
972 bufio.write(b"abc")
973 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000974 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 self.assertEquals(b"abc", writer._write_stack[0])
976
977 def test_truncate(self):
978 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000979 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000980 bufio = self.tp(raw, 8)
981 bufio.write(b"abcdef")
982 self.assertEqual(bufio.truncate(3), 3)
983 self.assertEqual(bufio.tell(), 3)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000984 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 self.assertEqual(f.read(), b"abc")
986
987 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000988 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 # Write out many bytes from many threads and test they were
990 # all flushed.
991 N = 1000
992 contents = bytes(range(256)) * N
993 sizes = cycle([1, 19])
994 n = 0
995 queue = deque()
996 while n < len(contents):
997 size = next(sizes)
998 queue.append(contents[n:n+size])
999 n += size
1000 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001001 # We use a real file object because it allows us to
1002 # exercise situations where the GIL is released before
1003 # writing the buffer to the raw streams. This is in addition
1004 # to concurrency issues due to switching threads in the middle
1005 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001006 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001008 errors = []
1009 def f():
1010 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 while True:
1012 try:
1013 s = queue.popleft()
1014 except IndexError:
1015 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001016 bufio.write(s)
1017 except Exception as e:
1018 errors.append(e)
1019 raise
1020 threads = [threading.Thread(target=f) for x in range(20)]
1021 for t in threads:
1022 t.start()
1023 time.sleep(0.02) # yield
1024 for t in threads:
1025 t.join()
1026 self.assertFalse(errors,
1027 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001028 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001029 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030 s = f.read()
1031 for i in range(256):
1032 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001033 finally:
1034 support.unlink(support.TESTFN)
1035
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 def test_misbehaved_io(self):
1037 rawio = self.MisbehavedRawIO()
1038 bufio = self.tp(rawio, 5)
1039 self.assertRaises(IOError, bufio.seek, 0)
1040 self.assertRaises(IOError, bufio.tell)
1041 self.assertRaises(IOError, bufio.write, b"abcdef")
1042
Benjamin Peterson59406a92009-03-26 17:10:29 +00001043 def test_max_buffer_size_deprecation(self):
1044 with support.check_warnings() as w:
1045 warnings.simplefilter("always", DeprecationWarning)
1046 self.tp(self.MockRawIO(), 8, 12)
1047 self.assertEqual(len(w.warnings), 1)
1048 warning = w.warnings[0]
1049 self.assertTrue(warning.category is DeprecationWarning)
1050 self.assertEqual(str(warning.message),
1051 "max_buffer_size is deprecated")
1052
1053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054class CBufferedWriterTest(BufferedWriterTest):
1055 tp = io.BufferedWriter
1056
1057 def test_constructor(self):
1058 BufferedWriterTest.test_constructor(self)
1059 # The allocation can succeed on 32-bit builds, e.g. with more
1060 # than 2GB RAM and a 64-bit kernel.
1061 if sys.maxsize > 0x7FFFFFFF:
1062 rawio = self.MockRawIO()
1063 bufio = self.tp(rawio)
1064 self.assertRaises((OverflowError, MemoryError, ValueError),
1065 bufio.__init__, rawio, sys.maxsize)
1066
1067 def test_initialization(self):
1068 rawio = self.MockRawIO()
1069 bufio = self.tp(rawio)
1070 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1071 self.assertRaises(ValueError, bufio.write, b"def")
1072 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1073 self.assertRaises(ValueError, bufio.write, b"def")
1074 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1075 self.assertRaises(ValueError, bufio.write, b"def")
1076
1077 def test_garbage_collection(self):
1078 # C BufferedWriter objects are collected, and collecting them flushes
1079 # all data to disk.
1080 # The Python version has __del__, so it ends into gc.garbage instead
1081 rawio = self.FileIO(support.TESTFN, "w+b")
1082 f = self.tp(rawio)
1083 f.write(b"123xxx")
1084 f.x = f
1085 wr = weakref.ref(f)
1086 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001087 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001088 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001089 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 self.assertEqual(f.read(), b"123xxx")
1091
1092
1093class PyBufferedWriterTest(BufferedWriterTest):
1094 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001095
Guido van Rossum01a27522007-03-07 01:00:12 +00001096class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001097
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001098 def test_constructor(self):
1099 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001100 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001101
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001102 def test_detach(self):
1103 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1104 self.assertRaises(self.UnsupportedOperation, pair.detach)
1105
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001106 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001107 with support.check_warnings() as w:
1108 warnings.simplefilter("always", DeprecationWarning)
1109 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1110 self.assertEqual(len(w.warnings), 1)
1111 warning = w.warnings[0]
1112 self.assertTrue(warning.category is DeprecationWarning)
1113 self.assertEqual(str(warning.message),
1114 "max_buffer_size is deprecated")
1115
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001116 def test_constructor_with_not_readable(self):
1117 class NotReadable(MockRawIO):
1118 def readable(self):
1119 return False
1120
1121 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1122
1123 def test_constructor_with_not_writeable(self):
1124 class NotWriteable(MockRawIO):
1125 def writable(self):
1126 return False
1127
1128 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1129
1130 def test_read(self):
1131 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1132
1133 self.assertEqual(pair.read(3), b"abc")
1134 self.assertEqual(pair.read(1), b"d")
1135 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001136 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1137 self.assertEqual(pair.read(None), b"abc")
1138
1139 def test_readlines(self):
1140 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1141 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1142 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1143 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001144
1145 def test_read1(self):
1146 # .read1() is delegated to the underlying reader object, so this test
1147 # can be shallow.
1148 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1149
1150 self.assertEqual(pair.read1(3), b"abc")
1151
1152 def test_readinto(self):
1153 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1154
1155 data = bytearray(5)
1156 self.assertEqual(pair.readinto(data), 5)
1157 self.assertEqual(data, b"abcde")
1158
1159 def test_write(self):
1160 w = self.MockRawIO()
1161 pair = self.tp(self.MockRawIO(), w)
1162
1163 pair.write(b"abc")
1164 pair.flush()
1165 pair.write(b"def")
1166 pair.flush()
1167 self.assertEqual(w._write_stack, [b"abc", b"def"])
1168
1169 def test_peek(self):
1170 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1171
1172 self.assertTrue(pair.peek(3).startswith(b"abc"))
1173 self.assertEqual(pair.read(3), b"abc")
1174
1175 def test_readable(self):
1176 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1177 self.assertTrue(pair.readable())
1178
1179 def test_writeable(self):
1180 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1181 self.assertTrue(pair.writable())
1182
1183 def test_seekable(self):
1184 # BufferedRWPairs are never seekable, even if their readers and writers
1185 # are.
1186 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1187 self.assertFalse(pair.seekable())
1188
1189 # .flush() is delegated to the underlying writer object and has been
1190 # tested in the test_write method.
1191
1192 def test_close_and_closed(self):
1193 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1194 self.assertFalse(pair.closed)
1195 pair.close()
1196 self.assertTrue(pair.closed)
1197
1198 def test_isatty(self):
1199 class SelectableIsAtty(MockRawIO):
1200 def __init__(self, isatty):
1201 MockRawIO.__init__(self)
1202 self._isatty = isatty
1203
1204 def isatty(self):
1205 return self._isatty
1206
1207 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1208 self.assertFalse(pair.isatty())
1209
1210 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1211 self.assertTrue(pair.isatty())
1212
1213 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1214 self.assertTrue(pair.isatty())
1215
1216 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1217 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219class CBufferedRWPairTest(BufferedRWPairTest):
1220 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001221
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001222class PyBufferedRWPairTest(BufferedRWPairTest):
1223 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225
1226class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1227 read_mode = "rb+"
1228 write_mode = "wb+"
1229
1230 def test_constructor(self):
1231 BufferedReaderTest.test_constructor(self)
1232 BufferedWriterTest.test_constructor(self)
1233
1234 def test_read_and_write(self):
1235 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001236 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001237
1238 self.assertEqual(b"as", rw.read(2))
1239 rw.write(b"ddd")
1240 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001241 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001242 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001243 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001245 def test_seek_and_tell(self):
1246 raw = self.BytesIO(b"asdfghjkl")
1247 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001248
1249 self.assertEquals(b"as", rw.read(2))
1250 self.assertEquals(2, rw.tell())
1251 rw.seek(0, 0)
1252 self.assertEquals(b"asdf", rw.read(4))
1253
1254 rw.write(b"asdf")
1255 rw.seek(0, 0)
1256 self.assertEquals(b"asdfasdfl", rw.read())
1257 self.assertEquals(9, rw.tell())
1258 rw.seek(-4, 2)
1259 self.assertEquals(5, rw.tell())
1260 rw.seek(2, 1)
1261 self.assertEquals(7, rw.tell())
1262 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001263 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 def check_flush_and_read(self, read_func):
1266 raw = self.BytesIO(b"abcdefghi")
1267 bufio = self.tp(raw)
1268
1269 self.assertEquals(b"ab", read_func(bufio, 2))
1270 bufio.write(b"12")
1271 self.assertEquals(b"ef", read_func(bufio, 2))
1272 self.assertEquals(6, bufio.tell())
1273 bufio.flush()
1274 self.assertEquals(6, bufio.tell())
1275 self.assertEquals(b"ghi", read_func(bufio))
1276 raw.seek(0, 0)
1277 raw.write(b"XYZ")
1278 # flush() resets the read buffer
1279 bufio.flush()
1280 bufio.seek(0, 0)
1281 self.assertEquals(b"XYZ", read_func(bufio, 3))
1282
1283 def test_flush_and_read(self):
1284 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1285
1286 def test_flush_and_readinto(self):
1287 def _readinto(bufio, n=-1):
1288 b = bytearray(n if n >= 0 else 9999)
1289 n = bufio.readinto(b)
1290 return bytes(b[:n])
1291 self.check_flush_and_read(_readinto)
1292
1293 def test_flush_and_peek(self):
1294 def _peek(bufio, n=-1):
1295 # This relies on the fact that the buffer can contain the whole
1296 # raw stream, otherwise peek() can return less.
1297 b = bufio.peek(n)
1298 if n != -1:
1299 b = b[:n]
1300 bufio.seek(len(b), 1)
1301 return b
1302 self.check_flush_and_read(_peek)
1303
1304 def test_flush_and_write(self):
1305 raw = self.BytesIO(b"abcdefghi")
1306 bufio = self.tp(raw)
1307
1308 bufio.write(b"123")
1309 bufio.flush()
1310 bufio.write(b"45")
1311 bufio.flush()
1312 bufio.seek(0, 0)
1313 self.assertEquals(b"12345fghi", raw.getvalue())
1314 self.assertEquals(b"12345fghi", bufio.read())
1315
1316 def test_threads(self):
1317 BufferedReaderTest.test_threads(self)
1318 BufferedWriterTest.test_threads(self)
1319
1320 def test_writes_and_peek(self):
1321 def _peek(bufio):
1322 bufio.peek(1)
1323 self.check_writes(_peek)
1324 def _peek(bufio):
1325 pos = bufio.tell()
1326 bufio.seek(-1, 1)
1327 bufio.peek(1)
1328 bufio.seek(pos, 0)
1329 self.check_writes(_peek)
1330
1331 def test_writes_and_reads(self):
1332 def _read(bufio):
1333 bufio.seek(-1, 1)
1334 bufio.read(1)
1335 self.check_writes(_read)
1336
1337 def test_writes_and_read1s(self):
1338 def _read1(bufio):
1339 bufio.seek(-1, 1)
1340 bufio.read1(1)
1341 self.check_writes(_read1)
1342
1343 def test_writes_and_readintos(self):
1344 def _read(bufio):
1345 bufio.seek(-1, 1)
1346 bufio.readinto(bytearray(1))
1347 self.check_writes(_read)
1348
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001349 def test_write_after_readahead(self):
1350 # Issue #6629: writing after the buffer was filled by readahead should
1351 # first rewind the raw stream.
1352 for overwrite_size in [1, 5]:
1353 raw = self.BytesIO(b"A" * 10)
1354 bufio = self.tp(raw, 4)
1355 # Trigger readahead
1356 self.assertEqual(bufio.read(1), b"A")
1357 self.assertEqual(bufio.tell(), 1)
1358 # Overwriting should rewind the raw stream if it needs so
1359 bufio.write(b"B" * overwrite_size)
1360 self.assertEqual(bufio.tell(), overwrite_size + 1)
1361 # If the write size was smaller than the buffer size, flush() and
1362 # check that rewind happens.
1363 bufio.flush()
1364 self.assertEqual(bufio.tell(), overwrite_size + 1)
1365 s = raw.getvalue()
1366 self.assertEqual(s,
1367 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001369 def test_misbehaved_io(self):
1370 BufferedReaderTest.test_misbehaved_io(self)
1371 BufferedWriterTest.test_misbehaved_io(self)
1372
1373class CBufferedRandomTest(BufferedRandomTest):
1374 tp = io.BufferedRandom
1375
1376 def test_constructor(self):
1377 BufferedRandomTest.test_constructor(self)
1378 # The allocation can succeed on 32-bit builds, e.g. with more
1379 # than 2GB RAM and a 64-bit kernel.
1380 if sys.maxsize > 0x7FFFFFFF:
1381 rawio = self.MockRawIO()
1382 bufio = self.tp(rawio)
1383 self.assertRaises((OverflowError, MemoryError, ValueError),
1384 bufio.__init__, rawio, sys.maxsize)
1385
1386 def test_garbage_collection(self):
1387 CBufferedReaderTest.test_garbage_collection(self)
1388 CBufferedWriterTest.test_garbage_collection(self)
1389
1390class PyBufferedRandomTest(BufferedRandomTest):
1391 tp = pyio.BufferedRandom
1392
1393
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001394# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1395# properties:
1396# - A single output character can correspond to many bytes of input.
1397# - The number of input bytes to complete the character can be
1398# undetermined until the last input byte is received.
1399# - The number of input bytes can vary depending on previous input.
1400# - A single input byte can correspond to many characters of output.
1401# - The number of output characters can be undetermined until the
1402# last input byte is received.
1403# - The number of output characters can vary depending on previous input.
1404
1405class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1406 """
1407 For testing seek/tell behavior with a stateful, buffering decoder.
1408
1409 Input is a sequence of words. Words may be fixed-length (length set
1410 by input) or variable-length (period-terminated). In variable-length
1411 mode, extra periods are ignored. Possible words are:
1412 - 'i' followed by a number sets the input length, I (maximum 99).
1413 When I is set to 0, words are space-terminated.
1414 - 'o' followed by a number sets the output length, O (maximum 99).
1415 - Any other word is converted into a word followed by a period on
1416 the output. The output word consists of the input word truncated
1417 or padded out with hyphens to make its length equal to O. If O
1418 is 0, the word is output verbatim without truncating or padding.
1419 I and O are initially set to 1. When I changes, any buffered input is
1420 re-scanned according to the new I. EOF also terminates the last word.
1421 """
1422
1423 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001424 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001425 self.reset()
1426
1427 def __repr__(self):
1428 return '<SID %x>' % id(self)
1429
1430 def reset(self):
1431 self.i = 1
1432 self.o = 1
1433 self.buffer = bytearray()
1434
1435 def getstate(self):
1436 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1437 return bytes(self.buffer), i*100 + o
1438
1439 def setstate(self, state):
1440 buffer, io = state
1441 self.buffer = bytearray(buffer)
1442 i, o = divmod(io, 100)
1443 self.i, self.o = i ^ 1, o ^ 1
1444
1445 def decode(self, input, final=False):
1446 output = ''
1447 for b in input:
1448 if self.i == 0: # variable-length, terminated with period
1449 if b == ord('.'):
1450 if self.buffer:
1451 output += self.process_word()
1452 else:
1453 self.buffer.append(b)
1454 else: # fixed-length, terminate after self.i bytes
1455 self.buffer.append(b)
1456 if len(self.buffer) == self.i:
1457 output += self.process_word()
1458 if final and self.buffer: # EOF terminates the last word
1459 output += self.process_word()
1460 return output
1461
1462 def process_word(self):
1463 output = ''
1464 if self.buffer[0] == ord('i'):
1465 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1466 elif self.buffer[0] == ord('o'):
1467 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1468 else:
1469 output = self.buffer.decode('ascii')
1470 if len(output) < self.o:
1471 output += '-'*self.o # pad out with hyphens
1472 if self.o:
1473 output = output[:self.o] # truncate to output length
1474 output += '.'
1475 self.buffer = bytearray()
1476 return output
1477
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001478 codecEnabled = False
1479
1480 @classmethod
1481 def lookupTestDecoder(cls, name):
1482 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001483 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001484 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001485 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001486 incrementalencoder=None,
1487 streamreader=None, streamwriter=None,
1488 incrementaldecoder=cls)
1489
1490# Register the previous decoder for testing.
1491# Disabled by default, tests will enable it.
1492codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1493
1494
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001495class StatefulIncrementalDecoderTest(unittest.TestCase):
1496 """
1497 Make sure the StatefulIncrementalDecoder actually works.
1498 """
1499
1500 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001501 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001502 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001503 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001504 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001505 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001506 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001507 # I=0, O=6 (variable-length input, fixed-length output)
1508 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1509 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001510 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001511 # I=6, O=3 (fixed-length input > fixed-length output)
1512 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1513 # I=0, then 3; O=29, then 15 (with longer output)
1514 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1515 'a----------------------------.' +
1516 'b----------------------------.' +
1517 'cde--------------------------.' +
1518 'abcdefghijabcde.' +
1519 'a.b------------.' +
1520 '.c.------------.' +
1521 'd.e------------.' +
1522 'k--------------.' +
1523 'l--------------.' +
1524 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001525 ]
1526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001527 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001528 # Try a few one-shot test cases.
1529 for input, eof, output in self.test_cases:
1530 d = StatefulIncrementalDecoder()
1531 self.assertEquals(d.decode(input, eof), output)
1532
1533 # Also test an unfinished decode, followed by forcing EOF.
1534 d = StatefulIncrementalDecoder()
1535 self.assertEquals(d.decode(b'oiabcd'), '')
1536 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001537
1538class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001539
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001540 def setUp(self):
1541 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1542 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001543 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001544
Guido van Rossumd0712812007-04-11 16:32:43 +00001545 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001546 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 def test_constructor(self):
1549 r = self.BytesIO(b"\xc3\xa9\n\n")
1550 b = self.BufferedReader(r, 1000)
1551 t = self.TextIOWrapper(b)
1552 t.__init__(b, encoding="latin1", newline="\r\n")
1553 self.assertEquals(t.encoding, "latin1")
1554 self.assertEquals(t.line_buffering, False)
1555 t.__init__(b, encoding="utf8", line_buffering=True)
1556 self.assertEquals(t.encoding, "utf8")
1557 self.assertEquals(t.line_buffering, True)
1558 self.assertEquals("\xe9\n", t.readline())
1559 self.assertRaises(TypeError, t.__init__, b, newline=42)
1560 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1561
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001562 def test_detach(self):
1563 r = self.BytesIO()
1564 b = self.BufferedWriter(r)
1565 t = self.TextIOWrapper(b)
1566 self.assertIs(t.detach(), b)
1567
1568 t = self.TextIOWrapper(b, encoding="ascii")
1569 t.write("howdy")
1570 self.assertFalse(r.getvalue())
1571 t.detach()
1572 self.assertEqual(r.getvalue(), b"howdy")
1573 self.assertRaises(ValueError, t.detach)
1574
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001575 def test_repr(self):
1576 raw = self.BytesIO("hello".encode("utf-8"))
1577 b = self.BufferedReader(raw)
1578 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001579 modname = self.TextIOWrapper.__module__
1580 self.assertEqual(repr(t),
1581 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1582 raw.name = "dummy"
1583 self.assertEqual(repr(t),
1584 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1585 raw.name = b"dummy"
1586 self.assertEqual(repr(t),
1587 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 def test_line_buffering(self):
1590 r = self.BytesIO()
1591 b = self.BufferedWriter(r, 1000)
1592 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001593 t.write("X")
1594 self.assertEquals(r.getvalue(), b"") # No flush happened
1595 t.write("Y\nZ")
1596 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1597 t.write("A\rB")
1598 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1599
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600 def test_encoding(self):
1601 # Check the encoding attribute is always set, and valid
1602 b = self.BytesIO()
1603 t = self.TextIOWrapper(b, encoding="utf8")
1604 self.assertEqual(t.encoding, "utf8")
1605 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001606 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001607 codecs.lookup(t.encoding)
1608
1609 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001610 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611 b = self.BytesIO(b"abc\n\xff\n")
1612 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001613 self.assertRaises(UnicodeError, t.read)
1614 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001615 b = self.BytesIO(b"abc\n\xff\n")
1616 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001617 self.assertRaises(UnicodeError, t.read)
1618 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 b = self.BytesIO(b"abc\n\xff\n")
1620 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001621 self.assertEquals(t.read(), "abc\n\n")
1622 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001623 b = self.BytesIO(b"abc\n\xff\n")
1624 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001625 self.assertEquals(t.read(), "abc\n\ufffd\n")
1626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001627 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001628 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 b = self.BytesIO()
1630 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001631 self.assertRaises(UnicodeError, t.write, "\xff")
1632 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 b = self.BytesIO()
1634 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001635 self.assertRaises(UnicodeError, t.write, "\xff")
1636 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 b = self.BytesIO()
1638 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001639 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001640 t.write("abc\xffdef\n")
1641 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001642 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001643 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 b = self.BytesIO()
1645 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001646 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001647 t.write("abc\xffdef\n")
1648 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001649 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001652 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1653
1654 tests = [
1655 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001656 [ '', input_lines ],
1657 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1658 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1659 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001660 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001661 encodings = (
1662 'utf-8', 'latin-1',
1663 'utf-16', 'utf-16-le', 'utf-16-be',
1664 'utf-32', 'utf-32-le', 'utf-32-be',
1665 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001666
Guido van Rossum8358db22007-08-18 21:39:55 +00001667 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001668 # character in TextIOWrapper._pending_line.
1669 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001670 # XXX: str.encode() should return bytes
1671 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001672 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001673 for bufsize in range(1, 10):
1674 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001675 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1676 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001677 encoding=encoding)
1678 if do_reads:
1679 got_lines = []
1680 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001681 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001682 if c2 == '':
1683 break
1684 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001685 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001686 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001687 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001688
1689 for got_line, exp_line in zip(got_lines, exp_lines):
1690 self.assertEquals(got_line, exp_line)
1691 self.assertEquals(len(got_lines), len(exp_lines))
1692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 def test_newlines_input(self):
1694 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001695 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1696 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001697 (None, normalized.decode("ascii").splitlines(True)),
1698 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001699 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1700 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1701 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001702 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 buf = self.BytesIO(testdata)
1704 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001705 self.assertEquals(txt.readlines(), expected)
1706 txt.seek(0)
1707 self.assertEquals(txt.read(), "".join(expected))
1708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001709 def test_newlines_output(self):
1710 testdict = {
1711 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1712 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1713 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1714 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1715 }
1716 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1717 for newline, expected in tests:
1718 buf = self.BytesIO()
1719 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1720 txt.write("AAA\nB")
1721 txt.write("BB\nCCC\n")
1722 txt.write("X\rY\r\nZ")
1723 txt.flush()
1724 self.assertEquals(buf.closed, False)
1725 self.assertEquals(buf.getvalue(), expected)
1726
1727 def test_destructor(self):
1728 l = []
1729 base = self.BytesIO
1730 class MyBytesIO(base):
1731 def close(self):
1732 l.append(self.getvalue())
1733 base.close(self)
1734 b = MyBytesIO()
1735 t = self.TextIOWrapper(b, encoding="ascii")
1736 t.write("abc")
1737 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001738 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001739 self.assertEquals([b"abc"], l)
1740
1741 def test_override_destructor(self):
1742 record = []
1743 class MyTextIO(self.TextIOWrapper):
1744 def __del__(self):
1745 record.append(1)
1746 try:
1747 f = super().__del__
1748 except AttributeError:
1749 pass
1750 else:
1751 f()
1752 def close(self):
1753 record.append(2)
1754 super().close()
1755 def flush(self):
1756 record.append(3)
1757 super().flush()
1758 b = self.BytesIO()
1759 t = MyTextIO(b, encoding="ascii")
1760 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001761 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762 self.assertEqual(record, [1, 2, 3])
1763
1764 def test_error_through_destructor(self):
1765 # Test that the exception state is not modified by a destructor,
1766 # even if close() fails.
1767 rawio = self.CloseFailureIO()
1768 def f():
1769 self.TextIOWrapper(rawio).xyzzy
1770 with support.captured_output("stderr") as s:
1771 self.assertRaises(AttributeError, f)
1772 s = s.getvalue().strip()
1773 if s:
1774 # The destructor *may* have printed an unraisable error, check it
1775 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001776 self.assertTrue(s.startswith("Exception IOError: "), s)
1777 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001778
Guido van Rossum9b76da62007-04-11 01:09:03 +00001779 # Systematic tests of the text I/O API
1780
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001781 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001782 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1783 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001785 f._CHUNK_SIZE = chunksize
1786 self.assertEquals(f.write("abc"), 3)
1787 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001789 f._CHUNK_SIZE = chunksize
1790 self.assertEquals(f.tell(), 0)
1791 self.assertEquals(f.read(), "abc")
1792 cookie = f.tell()
1793 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001794 self.assertEquals(f.read(None), "abc")
1795 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001796 self.assertEquals(f.read(2), "ab")
1797 self.assertEquals(f.read(1), "c")
1798 self.assertEquals(f.read(1), "")
1799 self.assertEquals(f.read(), "")
1800 self.assertEquals(f.tell(), cookie)
1801 self.assertEquals(f.seek(0), 0)
1802 self.assertEquals(f.seek(0, 2), cookie)
1803 self.assertEquals(f.write("def"), 3)
1804 self.assertEquals(f.seek(cookie), cookie)
1805 self.assertEquals(f.read(), "def")
1806 if enc.startswith("utf"):
1807 self.multi_line_test(f, enc)
1808 f.close()
1809
1810 def multi_line_test(self, f, enc):
1811 f.seek(0)
1812 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001813 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001814 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001815 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001816 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001817 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001818 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001819 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001820 wlines.append((f.tell(), line))
1821 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001822 f.seek(0)
1823 rlines = []
1824 while True:
1825 pos = f.tell()
1826 line = f.readline()
1827 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001828 break
1829 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001830 self.assertEquals(rlines, wlines)
1831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 def test_telling(self):
1833 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001834 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001835 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001836 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001837 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001838 p2 = f.tell()
1839 f.seek(0)
1840 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001841 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001842 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001843 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001844 self.assertEquals(f.tell(), p2)
1845 f.seek(0)
1846 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001847 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001848 self.assertRaises(IOError, f.tell)
1849 self.assertEquals(f.tell(), p2)
1850 f.close()
1851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 def test_seeking(self):
1853 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001854 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001855 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001856 prefix = bytes(u_prefix.encode("utf-8"))
1857 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001858 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001859 suffix = bytes(u_suffix.encode("utf-8"))
1860 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001861 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001862 f.write(line*2)
1863 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001864 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001865 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001866 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001867 self.assertEquals(f.tell(), prefix_size)
1868 self.assertEquals(f.readline(), u_suffix)
1869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001871 # Regression test for a specific bug
1872 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001874 f.write(data)
1875 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001877 f._CHUNK_SIZE # Just test that it exists
1878 f._CHUNK_SIZE = 2
1879 f.readline()
1880 f.tell()
1881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 def test_seek_and_tell(self):
1883 #Test seek/tell using the StatefulIncrementalDecoder.
1884 # Make test faster by doing smaller seeks
1885 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001886
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001887 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001888 """Tell/seek to various points within a data stream and ensure
1889 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001891 f.write(data)
1892 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 f = self.open(support.TESTFN, encoding='test_decoder')
1894 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001895 decoded = f.read()
1896 f.close()
1897
Neal Norwitze2b07052008-03-18 19:52:05 +00001898 for i in range(min_pos, len(decoded) + 1): # seek positions
1899 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001900 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001901 self.assertEquals(f.read(i), decoded[:i])
1902 cookie = f.tell()
1903 self.assertEquals(f.read(j), decoded[i:i + j])
1904 f.seek(cookie)
1905 self.assertEquals(f.read(), decoded[i:])
1906 f.close()
1907
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001908 # Enable the test decoder.
1909 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001910
1911 # Run the tests.
1912 try:
1913 # Try each test case.
1914 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001915 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001916
1917 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001918 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1919 offset = CHUNK_SIZE - len(input)//2
1920 prefix = b'.'*offset
1921 # Don't bother seeking into the prefix (takes too long).
1922 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001923 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001924
1925 # Ensure our test decoder won't interfere with subsequent tests.
1926 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001927 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001930 data = "1234567890"
1931 tests = ("utf-16",
1932 "utf-16-le",
1933 "utf-16-be",
1934 "utf-32",
1935 "utf-32-le",
1936 "utf-32-be")
1937 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001938 buf = self.BytesIO()
1939 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001940 # Check if the BOM is written only once (see issue1753).
1941 f.write(data)
1942 f.write(data)
1943 f.seek(0)
1944 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001945 f.seek(0)
1946 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001947 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1948
Benjamin Petersona1b49012009-03-31 23:11:32 +00001949 def test_unreadable(self):
1950 class UnReadable(self.BytesIO):
1951 def readable(self):
1952 return False
1953 txt = self.TextIOWrapper(UnReadable())
1954 self.assertRaises(IOError, txt.read)
1955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956 def test_read_one_by_one(self):
1957 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001958 reads = ""
1959 while True:
1960 c = txt.read(1)
1961 if not c:
1962 break
1963 reads += c
1964 self.assertEquals(reads, "AA\nBB")
1965
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001966 def test_readlines(self):
1967 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1968 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1969 txt.seek(0)
1970 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1971 txt.seek(0)
1972 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1973
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001974 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001975 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001976 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001978 reads = ""
1979 while True:
1980 c = txt.read(128)
1981 if not c:
1982 break
1983 reads += c
1984 self.assertEquals(reads, "A"*127+"\nB")
1985
1986 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001987 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001988
1989 # read one char at a time
1990 reads = ""
1991 while True:
1992 c = txt.read(1)
1993 if not c:
1994 break
1995 reads += c
1996 self.assertEquals(reads, self.normalized)
1997
1998 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001999 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002000 txt._CHUNK_SIZE = 4
2001
2002 reads = ""
2003 while True:
2004 c = txt.read(4)
2005 if not c:
2006 break
2007 reads += c
2008 self.assertEquals(reads, self.normalized)
2009
2010 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002011 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002012 txt._CHUNK_SIZE = 4
2013
2014 reads = txt.read(4)
2015 reads += txt.read(4)
2016 reads += txt.readline()
2017 reads += txt.readline()
2018 reads += txt.readline()
2019 self.assertEquals(reads, self.normalized)
2020
2021 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002022 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002023 txt._CHUNK_SIZE = 4
2024
2025 reads = txt.read(4)
2026 reads += txt.read()
2027 self.assertEquals(reads, self.normalized)
2028
2029 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002030 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002031 txt._CHUNK_SIZE = 4
2032
2033 reads = txt.read(4)
2034 pos = txt.tell()
2035 txt.seek(0)
2036 txt.seek(pos)
2037 self.assertEquals(txt.read(4), "BBB\n")
2038
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002039 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002040 buffer = self.BytesIO(self.testdata)
2041 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002042
2043 self.assertEqual(buffer.seekable(), txt.seekable())
2044
Antoine Pitroue4501852009-05-14 18:55:55 +00002045 def test_append_bom(self):
2046 # The BOM is not written again when appending to a non-empty file
2047 filename = support.TESTFN
2048 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2049 with self.open(filename, 'w', encoding=charset) as f:
2050 f.write('aaa')
2051 pos = f.tell()
2052 with self.open(filename, 'rb') as f:
2053 self.assertEquals(f.read(), 'aaa'.encode(charset))
2054
2055 with self.open(filename, 'a', encoding=charset) as f:
2056 f.write('xxx')
2057 with self.open(filename, 'rb') as f:
2058 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2059
2060 def test_seek_bom(self):
2061 # Same test, but when seeking manually
2062 filename = support.TESTFN
2063 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2064 with self.open(filename, 'w', encoding=charset) as f:
2065 f.write('aaa')
2066 pos = f.tell()
2067 with self.open(filename, 'r+', encoding=charset) as f:
2068 f.seek(pos)
2069 f.write('zzz')
2070 f.seek(0)
2071 f.write('bbb')
2072 with self.open(filename, 'rb') as f:
2073 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2074
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002075 def test_errors_property(self):
2076 with self.open(support.TESTFN, "w") as f:
2077 self.assertEqual(f.errors, "strict")
2078 with self.open(support.TESTFN, "w", errors="replace") as f:
2079 self.assertEqual(f.errors, "replace")
2080
Antoine Pitroue4501852009-05-14 18:55:55 +00002081
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002082 def test_threads_write(self):
2083 # Issue6750: concurrent writes could duplicate data
2084 event = threading.Event()
2085 with self.open(support.TESTFN, "w", buffering=1) as f:
2086 def run(n):
2087 text = "Thread%03d\n" % n
2088 event.wait()
2089 f.write(text)
2090 threads = [threading.Thread(target=lambda n=x: run(n))
2091 for x in range(20)]
2092 for t in threads:
2093 t.start()
2094 time.sleep(0.02)
2095 event.set()
2096 for t in threads:
2097 t.join()
2098 with self.open(support.TESTFN) as f:
2099 content = f.read()
2100 for n in range(20):
2101 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002103class CTextIOWrapperTest(TextIOWrapperTest):
2104
2105 def test_initialization(self):
2106 r = self.BytesIO(b"\xc3\xa9\n\n")
2107 b = self.BufferedReader(r, 1000)
2108 t = self.TextIOWrapper(b)
2109 self.assertRaises(TypeError, t.__init__, b, newline=42)
2110 self.assertRaises(ValueError, t.read)
2111 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2112 self.assertRaises(ValueError, t.read)
2113
2114 def test_garbage_collection(self):
2115 # C TextIOWrapper objects are collected, and collecting them flushes
2116 # all data to disk.
2117 # The Python version has __del__, so it ends in gc.garbage instead.
2118 rawio = io.FileIO(support.TESTFN, "wb")
2119 b = self.BufferedWriter(rawio)
2120 t = self.TextIOWrapper(b, encoding="ascii")
2121 t.write("456def")
2122 t.x = t
2123 wr = weakref.ref(t)
2124 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002125 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002126 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002127 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002128 self.assertEqual(f.read(), b"456def")
2129
2130class PyTextIOWrapperTest(TextIOWrapperTest):
2131 pass
2132
2133
2134class IncrementalNewlineDecoderTest(unittest.TestCase):
2135
2136 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002137 # UTF-8 specific tests for a newline decoder
2138 def _check_decode(b, s, **kwargs):
2139 # We exercise getstate() / setstate() as well as decode()
2140 state = decoder.getstate()
2141 self.assertEquals(decoder.decode(b, **kwargs), s)
2142 decoder.setstate(state)
2143 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002144
Antoine Pitrou180a3362008-12-14 16:36:46 +00002145 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002146
Antoine Pitrou180a3362008-12-14 16:36:46 +00002147 _check_decode(b'\xe8', "")
2148 _check_decode(b'\xa2', "")
2149 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002150
Antoine Pitrou180a3362008-12-14 16:36:46 +00002151 _check_decode(b'\xe8', "")
2152 _check_decode(b'\xa2', "")
2153 _check_decode(b'\x88', "\u8888")
2154
2155 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002156 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2157
Antoine Pitrou180a3362008-12-14 16:36:46 +00002158 decoder.reset()
2159 _check_decode(b'\n', "\n")
2160 _check_decode(b'\r', "")
2161 _check_decode(b'', "\n", final=True)
2162 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002163
Antoine Pitrou180a3362008-12-14 16:36:46 +00002164 _check_decode(b'\r', "")
2165 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002166
Antoine Pitrou180a3362008-12-14 16:36:46 +00002167 _check_decode(b'\r\r\n', "\n\n")
2168 _check_decode(b'\r', "")
2169 _check_decode(b'\r', "\n")
2170 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002171
Antoine Pitrou180a3362008-12-14 16:36:46 +00002172 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2173 _check_decode(b'\xe8\xa2\x88', "\u8888")
2174 _check_decode(b'\n', "\n")
2175 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2176 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002179 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180 if encoding is not None:
2181 encoder = codecs.getincrementalencoder(encoding)()
2182 def _decode_bytewise(s):
2183 # Decode one byte at a time
2184 for b in encoder.encode(s):
2185 result.append(decoder.decode(bytes([b])))
2186 else:
2187 encoder = None
2188 def _decode_bytewise(s):
2189 # Decode one char at a time
2190 for c in s:
2191 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002192 self.assertEquals(decoder.newlines, None)
2193 _decode_bytewise("abc\n\r")
2194 self.assertEquals(decoder.newlines, '\n')
2195 _decode_bytewise("\nabc")
2196 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2197 _decode_bytewise("abc\r")
2198 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2199 _decode_bytewise("abc")
2200 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2201 _decode_bytewise("abc\r")
2202 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2203 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002204 input = "abc"
2205 if encoder is not None:
2206 encoder.reset()
2207 input = encoder.encode(input)
2208 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002209 self.assertEquals(decoder.newlines, None)
2210
2211 def test_newline_decoder(self):
2212 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002213 # None meaning the IncrementalNewlineDecoder takes unicode input
2214 # rather than bytes input
2215 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002216 'utf-16', 'utf-16-le', 'utf-16-be',
2217 'utf-32', 'utf-32-le', 'utf-32-be',
2218 )
2219 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 decoder = enc and codecs.getincrementaldecoder(enc)()
2221 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2222 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002223 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002224 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2225 self.check_newline_decoding_utf8(decoder)
2226
Antoine Pitrou66913e22009-03-06 23:40:56 +00002227 def test_newline_bytes(self):
2228 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2229 def _check(dec):
2230 self.assertEquals(dec.newlines, None)
2231 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2232 self.assertEquals(dec.newlines, None)
2233 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2234 self.assertEquals(dec.newlines, None)
2235 dec = self.IncrementalNewlineDecoder(None, translate=False)
2236 _check(dec)
2237 dec = self.IncrementalNewlineDecoder(None, translate=True)
2238 _check(dec)
2239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2241 pass
2242
2243class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2244 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002245
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002246
Guido van Rossum01a27522007-03-07 01:00:12 +00002247# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002248
Guido van Rossum5abbf752007-08-27 17:39:33 +00002249class MiscIOTest(unittest.TestCase):
2250
Barry Warsaw40e82462008-11-20 20:14:50 +00002251 def tearDown(self):
2252 support.unlink(support.TESTFN)
2253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002254 def test___all__(self):
2255 for name in self.io.__all__:
2256 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002257 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002258 if name == "open":
2259 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002260 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002261 self.assertTrue(issubclass(obj, Exception), name)
2262 elif not name.startswith("SEEK_"):
2263 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002264
Barry Warsaw40e82462008-11-20 20:14:50 +00002265 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002266 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002267 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002268 f.close()
2269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002271 self.assertEquals(f.name, support.TESTFN)
2272 self.assertEquals(f.buffer.name, support.TESTFN)
2273 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2274 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002275 self.assertEquals(f.buffer.mode, "rb")
2276 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002277 f.close()
2278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002280 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002281 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2282 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002285 self.assertEquals(g.mode, "wb")
2286 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002287 self.assertEquals(g.name, f.fileno())
2288 self.assertEquals(g.raw.name, f.fileno())
2289 f.close()
2290 g.close()
2291
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002292 def test_io_after_close(self):
2293 for kwargs in [
2294 {"mode": "w"},
2295 {"mode": "wb"},
2296 {"mode": "w", "buffering": 1},
2297 {"mode": "w", "buffering": 2},
2298 {"mode": "wb", "buffering": 0},
2299 {"mode": "r"},
2300 {"mode": "rb"},
2301 {"mode": "r", "buffering": 1},
2302 {"mode": "r", "buffering": 2},
2303 {"mode": "rb", "buffering": 0},
2304 {"mode": "w+"},
2305 {"mode": "w+b"},
2306 {"mode": "w+", "buffering": 1},
2307 {"mode": "w+", "buffering": 2},
2308 {"mode": "w+b", "buffering": 0},
2309 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002310 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002311 f.close()
2312 self.assertRaises(ValueError, f.flush)
2313 self.assertRaises(ValueError, f.fileno)
2314 self.assertRaises(ValueError, f.isatty)
2315 self.assertRaises(ValueError, f.__iter__)
2316 if hasattr(f, "peek"):
2317 self.assertRaises(ValueError, f.peek, 1)
2318 self.assertRaises(ValueError, f.read)
2319 if hasattr(f, "read1"):
2320 self.assertRaises(ValueError, f.read1, 1024)
2321 if hasattr(f, "readinto"):
2322 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2323 self.assertRaises(ValueError, f.readline)
2324 self.assertRaises(ValueError, f.readlines)
2325 self.assertRaises(ValueError, f.seek, 0)
2326 self.assertRaises(ValueError, f.tell)
2327 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 self.assertRaises(ValueError, f.write,
2329 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002330 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002332
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002333 def test_blockingioerror(self):
2334 # Various BlockingIOError issues
2335 self.assertRaises(TypeError, self.BlockingIOError)
2336 self.assertRaises(TypeError, self.BlockingIOError, 1)
2337 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2338 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2339 b = self.BlockingIOError(1, "")
2340 self.assertEqual(b.characters_written, 0)
2341 class C(str):
2342 pass
2343 c = C("")
2344 b = self.BlockingIOError(1, c)
2345 c.b = b
2346 b.c = c
2347 wr = weakref.ref(c)
2348 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002349 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002350 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002351
2352 def test_abcs(self):
2353 # Test the visible base classes are ABCs.
2354 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2355 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2356 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2357 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2358
2359 def _check_abc_inheritance(self, abcmodule):
2360 with self.open(support.TESTFN, "wb", buffering=0) as f:
2361 self.assertTrue(isinstance(f, abcmodule.IOBase))
2362 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2363 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2364 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2365 with self.open(support.TESTFN, "wb") as f:
2366 self.assertTrue(isinstance(f, abcmodule.IOBase))
2367 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2368 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2369 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2370 with self.open(support.TESTFN, "w") as f:
2371 self.assertTrue(isinstance(f, abcmodule.IOBase))
2372 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2373 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2374 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2375
2376 def test_abc_inheritance(self):
2377 # Test implementations inherit from their respective ABCs
2378 self._check_abc_inheritance(self)
2379
2380 def test_abc_inheritance_official(self):
2381 # Test implementations inherit from the official ABCs of the
2382 # baseline "io" module.
2383 self._check_abc_inheritance(io)
2384
2385class CMiscIOTest(MiscIOTest):
2386 io = io
2387
2388class PyMiscIOTest(MiscIOTest):
2389 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002390
Guido van Rossum28524c72007-02-27 05:47:44 +00002391def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392 tests = (CIOTest, PyIOTest,
2393 CBufferedReaderTest, PyBufferedReaderTest,
2394 CBufferedWriterTest, PyBufferedWriterTest,
2395 CBufferedRWPairTest, PyBufferedRWPairTest,
2396 CBufferedRandomTest, PyBufferedRandomTest,
2397 StatefulIncrementalDecoderTest,
2398 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2399 CTextIOWrapperTest, PyTextIOWrapperTest,
2400 CMiscIOTest, PyMiscIOTest,)
2401
2402 # Put the namespaces of the IO module we are testing and some useful mock
2403 # classes in the __dict__ of each test.
2404 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2405 MockNonBlockWriterIO)
2406 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2407 c_io_ns = {name : getattr(io, name) for name in all_members}
2408 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2409 globs = globals()
2410 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2411 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2412 # Avoid turning open into a bound method.
2413 py_io_ns["open"] = pyio.OpenWrapper
2414 for test in tests:
2415 if test.__name__.startswith("C"):
2416 for name, obj in c_io_ns.items():
2417 setattr(test, name, obj)
2418 elif test.__name__.startswith("Py"):
2419 for name, obj in py_io_ns.items():
2420 setattr(test, name, obj)
2421
2422 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002423
2424if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002425 test_main()