blob: f6175c8a563e28c54e3c849d44c44abc5be1bc6b [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000232 f.truncate(0)
233 self.assertEqual(f.tell(), 5)
234 f.seek(0)
235
236 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000239 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000241 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000243 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000244 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000247
Guido van Rossum87429772007-04-10 21:06:59 +0000248 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000249 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000250 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000251
Guido van Rossum9b76da62007-04-11 01:09:03 +0000252 def read_ops(self, f, buffered=False):
253 data = f.read(5)
254 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000255 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000256 self.assertEqual(f.readinto(data), 5)
257 self.assertEqual(data, b" worl")
258 self.assertEqual(f.readinto(data), 2)
259 self.assertEqual(len(data), 5)
260 self.assertEqual(data[:2], b"d\n")
261 self.assertEqual(f.seek(0), 0)
262 self.assertEqual(f.read(20), b"hello world\n")
263 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000264 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000265 self.assertEqual(f.seek(-6, 2), 6)
266 self.assertEqual(f.read(5), b"world")
267 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000268 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000269 self.assertEqual(f.seek(-6, 1), 5)
270 self.assertEqual(f.read(5), b" worl")
271 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000273 if buffered:
274 f.seek(0)
275 self.assertEqual(f.read(), b"hello world\n")
276 f.seek(6)
277 self.assertEqual(f.read(), b"world\n")
278 self.assertEqual(f.read(), b"")
279
Guido van Rossum34d69e52007-04-10 20:08:41 +0000280 LARGE = 2**31
281
Guido van Rossum53807da2007-04-10 19:01:47 +0000282 def large_file_ops(self, f):
283 assert f.readable()
284 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.seek(self.LARGE), self.LARGE)
286 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000287 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000288 self.assertEqual(f.tell(), self.LARGE + 3)
289 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000291 self.assertEqual(f.tell(), self.LARGE + 2)
292 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +0000294 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000295 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
296 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000297 self.assertEqual(f.read(2), b"x")
298
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000299 def test_invalid_operations(self):
300 # Try writing on a file opened in read mode and vice-versa.
301 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000302 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000303 self.assertRaises(IOError, fp.read)
304 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000305 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000306 self.assertRaises(IOError, fp.write, b"blah")
307 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000308 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000309 self.assertRaises(IOError, fp.write, "blah")
310 self.assertRaises(IOError, fp.writelines, ["blah\n"])
311
Guido van Rossum28524c72007-02-27 05:47:44 +0000312 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000313 with self.open(support.TESTFN, "wb", buffering=0) as f:
314 self.assertEqual(f.readable(), False)
315 self.assertEqual(f.writable(), True)
316 self.assertEqual(f.seekable(), True)
317 self.write_ops(f)
318 with self.open(support.TESTFN, "rb", buffering=0) as f:
319 self.assertEqual(f.readable(), True)
320 self.assertEqual(f.writable(), False)
321 self.assertEqual(f.seekable(), True)
322 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000323
Guido van Rossum87429772007-04-10 21:06:59 +0000324 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000325 with self.open(support.TESTFN, "wb") as f:
326 self.assertEqual(f.readable(), False)
327 self.assertEqual(f.writable(), True)
328 self.assertEqual(f.seekable(), True)
329 self.write_ops(f)
330 with self.open(support.TESTFN, "rb") as f:
331 self.assertEqual(f.readable(), True)
332 self.assertEqual(f.writable(), False)
333 self.assertEqual(f.seekable(), True)
334 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000335
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000336 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000337 with self.open(support.TESTFN, "wb") as f:
338 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
339 with self.open(support.TESTFN, "rb") as f:
340 self.assertEqual(f.readline(), b"abc\n")
341 self.assertEqual(f.readline(10), b"def\n")
342 self.assertEqual(f.readline(2), b"xy")
343 self.assertEqual(f.readline(4), b"zzy\n")
344 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000345 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000346 self.assertRaises(TypeError, f.readline, 5.3)
347 with self.open(support.TESTFN, "r") as f:
348 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000349
Guido van Rossum28524c72007-02-27 05:47:44 +0000350 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000351 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000352 self.write_ops(f)
353 data = f.getvalue()
354 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000355 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000357
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 # On Windows and Mac OSX this test comsumes large resources; It takes
360 # a long time to build the >2GB file and takes >2GB of disk space
361 # therefore the resource must be enabled to run this test.
362 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000363 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000364 print("\nTesting large file ops skipped on %s." % sys.platform,
365 file=sys.stderr)
366 print("It requires %d bytes and a long time." % self.LARGE,
367 file=sys.stderr)
368 print("Use 'regrtest.py -u largefile test_io' to run it.",
369 file=sys.stderr)
370 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 with self.open(support.TESTFN, "w+b", 0) as f:
372 self.large_file_ops(f)
373 with self.open(support.TESTFN, "w+b") as f:
374 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000375
376 def test_with_open(self):
377 for bufsize in (0, 1, 100):
378 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000379 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000380 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000381 self.assertEqual(f.closed, True)
382 f = None
383 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000384 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000385 1/0
386 except ZeroDivisionError:
387 self.assertEqual(f.closed, True)
388 else:
389 self.fail("1/0 didn't raise an exception")
390
Antoine Pitrou08838b62009-01-21 00:55:13 +0000391 # issue 5008
392 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000394 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000395 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000396 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000398 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 with self.open(support.TESTFN, "a") as f:
Georg Brandlab91fde2009-08-13 08:51:18 +0000400 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000401
Guido van Rossum87429772007-04-10 21:06:59 +0000402 def test_destructor(self):
403 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000404 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000405 def __del__(self):
406 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000407 try:
408 f = super().__del__
409 except AttributeError:
410 pass
411 else:
412 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000413 def close(self):
414 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000416 def flush(self):
417 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000418 super().flush()
419 f = MyFileIO(support.TESTFN, "wb")
420 f.write(b"xxx")
421 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000422 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000424 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000425 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000426
427 def _check_base_destructor(self, base):
428 record = []
429 class MyIO(base):
430 def __init__(self):
431 # This exercises the availability of attributes on object
432 # destruction.
433 # (in the C version, close() is called by the tp_dealloc
434 # function, not by __del__)
435 self.on_del = 1
436 self.on_close = 2
437 self.on_flush = 3
438 def __del__(self):
439 record.append(self.on_del)
440 try:
441 f = super().__del__
442 except AttributeError:
443 pass
444 else:
445 f()
446 def close(self):
447 record.append(self.on_close)
448 super().close()
449 def flush(self):
450 record.append(self.on_flush)
451 super().flush()
452 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000453 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000454 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000455 self.assertEqual(record, [1, 2, 3])
456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 def test_IOBase_destructor(self):
458 self._check_base_destructor(self.IOBase)
459
460 def test_RawIOBase_destructor(self):
461 self._check_base_destructor(self.RawIOBase)
462
463 def test_BufferedIOBase_destructor(self):
464 self._check_base_destructor(self.BufferedIOBase)
465
466 def test_TextIOBase_destructor(self):
467 self._check_base_destructor(self.TextIOBase)
468
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000470 with self.open(support.TESTFN, "wb") as f:
471 f.write(b"xxx")
472 with self.open(support.TESTFN, "rb") as f:
473 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000474
Guido van Rossumd4103952007-04-12 05:44:49 +0000475 def test_array_writes(self):
476 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000477 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000478 with self.open(support.TESTFN, "wb", 0) as f:
479 self.assertEqual(f.write(a), n)
480 with self.open(support.TESTFN, "wb") as f:
481 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000482
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000483 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000485 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 def test_read_closed(self):
488 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000489 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 with self.open(support.TESTFN, "r") as f:
491 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000492 self.assertEqual(file.read(), "egg\n")
493 file.seek(0)
494 file.close()
495 self.assertRaises(ValueError, file.read)
496
497 def test_no_closefd_with_filename(self):
498 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000499 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000500
501 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000502 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000503 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000507 self.assertEqual(file.buffer.raw.closefd, False)
508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000509 def test_garbage_collection(self):
510 # FileIO objects are collected, and collecting them flushes
511 # all data to disk.
512 f = self.FileIO(support.TESTFN, "wb")
513 f.write(b"abcxxx")
514 f.f = f
515 wr = weakref.ref(f)
516 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000517 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000518 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000519 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000521
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000522 def test_unbounded_file(self):
523 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
524 zero = "/dev/zero"
525 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000526 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000527 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000528 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000529 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000530 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000531 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000532 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000533 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000534 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000536 self.assertRaises(OverflowError, f.read)
537
Antoine Pitroufaf90072010-05-03 16:58:19 +0000538 def test_flush_error_on_close(self):
539 f = self.open(support.TESTFN, "wb", buffering=0)
540 def bad_flush():
541 raise IOError()
542 f.flush = bad_flush
543 self.assertRaises(IOError, f.close) # exception not swallowed
544
545 def test_multi_close(self):
546 f = self.open(support.TESTFN, "wb", buffering=0)
547 f.close()
548 f.close()
549 f.close()
550 self.assertRaises(ValueError, f.flush)
551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552class CIOTest(IOTest):
553 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000554
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000555class PyIOTest(IOTest):
556 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000557
Guido van Rossuma9e20242007-03-08 00:43:48 +0000558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000559class CommonBufferedTests:
560 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
561
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000562 def test_detach(self):
563 raw = self.MockRawIO()
564 buf = self.tp(raw)
565 self.assertIs(buf.detach(), raw)
566 self.assertRaises(ValueError, buf.detach)
567
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000568 def test_fileno(self):
569 rawio = self.MockRawIO()
570 bufio = self.tp(rawio)
571
572 self.assertEquals(42, bufio.fileno())
573
574 def test_no_fileno(self):
575 # XXX will we always have fileno() function? If so, kill
576 # this test. Else, write it.
577 pass
578
579 def test_invalid_args(self):
580 rawio = self.MockRawIO()
581 bufio = self.tp(rawio)
582 # Invalid whence
583 self.assertRaises(ValueError, bufio.seek, 0, -1)
584 self.assertRaises(ValueError, bufio.seek, 0, 3)
585
586 def test_override_destructor(self):
587 tp = self.tp
588 record = []
589 class MyBufferedIO(tp):
590 def __del__(self):
591 record.append(1)
592 try:
593 f = super().__del__
594 except AttributeError:
595 pass
596 else:
597 f()
598 def close(self):
599 record.append(2)
600 super().close()
601 def flush(self):
602 record.append(3)
603 super().flush()
604 rawio = self.MockRawIO()
605 bufio = MyBufferedIO(rawio)
606 writable = bufio.writable()
607 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000608 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 if writable:
610 self.assertEqual(record, [1, 2, 3])
611 else:
612 self.assertEqual(record, [1, 2])
613
614 def test_context_manager(self):
615 # Test usability as a context manager
616 rawio = self.MockRawIO()
617 bufio = self.tp(rawio)
618 def _with():
619 with bufio:
620 pass
621 _with()
622 # bufio should now be closed, and using it a second time should raise
623 # a ValueError.
624 self.assertRaises(ValueError, _with)
625
626 def test_error_through_destructor(self):
627 # Test that the exception state is not modified by a destructor,
628 # even if close() fails.
629 rawio = self.CloseFailureIO()
630 def f():
631 self.tp(rawio).xyzzy
632 with support.captured_output("stderr") as s:
633 self.assertRaises(AttributeError, f)
634 s = s.getvalue().strip()
635 if s:
636 # The destructor *may* have printed an unraisable error, check it
637 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +0000638 self.assertTrue(s.startswith("Exception IOError: "), s)
639 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000640
Antoine Pitrou716c4442009-05-23 19:04:03 +0000641 def test_repr(self):
642 raw = self.MockRawIO()
643 b = self.tp(raw)
644 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
645 self.assertEqual(repr(b), "<%s>" % clsname)
646 raw.name = "dummy"
647 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
648 raw.name = b"dummy"
649 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
650
Antoine Pitroufaf90072010-05-03 16:58:19 +0000651 def test_flush_error_on_close(self):
652 raw = self.MockRawIO()
653 def bad_flush():
654 raise IOError()
655 raw.flush = bad_flush
656 b = self.tp(raw)
657 self.assertRaises(IOError, b.close) # exception not swallowed
658
659 def test_multi_close(self):
660 raw = self.MockRawIO()
661 b = self.tp(raw)
662 b.close()
663 b.close()
664 b.close()
665 self.assertRaises(ValueError, b.flush)
666
Guido van Rossum78892e42007-04-06 17:31:18 +0000667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000668class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
669 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671 def test_constructor(self):
672 rawio = self.MockRawIO([b"abc"])
673 bufio = self.tp(rawio)
674 bufio.__init__(rawio)
675 bufio.__init__(rawio, buffer_size=1024)
676 bufio.__init__(rawio, buffer_size=16)
677 self.assertEquals(b"abc", bufio.read())
678 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
679 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
680 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
681 rawio = self.MockRawIO([b"abc"])
682 bufio.__init__(rawio)
683 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000685 def test_read(self):
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000686 for arg in (None, 7):
687 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
688 bufio = self.tp(rawio)
689 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 # Invalid args
691 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000693 def test_read1(self):
694 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
695 bufio = self.tp(rawio)
696 self.assertEquals(b"a", bufio.read(1))
697 self.assertEquals(b"b", bufio.read1(1))
698 self.assertEquals(rawio._reads, 1)
699 self.assertEquals(b"c", bufio.read1(100))
700 self.assertEquals(rawio._reads, 1)
701 self.assertEquals(b"d", bufio.read1(100))
702 self.assertEquals(rawio._reads, 2)
703 self.assertEquals(b"efg", bufio.read1(100))
704 self.assertEquals(rawio._reads, 3)
705 self.assertEquals(b"", bufio.read1(100))
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000706 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 # Invalid args
708 self.assertRaises(ValueError, bufio.read1, -1)
709
710 def test_readinto(self):
711 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
712 bufio = self.tp(rawio)
713 b = bytearray(2)
714 self.assertEquals(bufio.readinto(b), 2)
715 self.assertEquals(b, b"ab")
716 self.assertEquals(bufio.readinto(b), 2)
717 self.assertEquals(b, b"cd")
718 self.assertEquals(bufio.readinto(b), 2)
719 self.assertEquals(b, b"ef")
720 self.assertEquals(bufio.readinto(b), 1)
721 self.assertEquals(b, b"gf")
722 self.assertEquals(bufio.readinto(b), 0)
723 self.assertEquals(b, b"gf")
724
Benjamin Peterson6b59f772009-12-13 19:30:15 +0000725 def test_readlines(self):
726 def bufio():
727 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
728 return self.tp(rawio)
729 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
730 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
731 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000734 data = b"abcdefghi"
735 dlen = len(data)
736
737 tests = [
738 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
739 [ 100, [ 3, 3, 3], [ dlen ] ],
740 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
741 ]
742
743 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 rawio = self.MockFileIO(data)
745 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000746 pos = 0
747 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000748 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000749 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000750 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000751 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000754 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
756 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000757
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000758 self.assertEquals(b"abcd", bufio.read(6))
759 self.assertEquals(b"e", bufio.read(1))
760 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000761 self.assertEquals(b"", bufio.peek(1))
Georg Brandlab91fde2009-08-13 08:51:18 +0000762 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000763 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000765 def test_read_past_eof(self):
766 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
767 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000768
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000769 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000771 def test_read_all(self):
772 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
773 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000774
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000775 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000776
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000777 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000778 try:
779 # Write out many bytes with exactly the same number of 0's,
780 # 1's... 255's. This will help us check that concurrent reading
781 # doesn't duplicate or forget contents.
782 N = 1000
783 l = list(range(256)) * N
784 random.shuffle(l)
785 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000786 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000787 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000788 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000790 errors = []
791 results = []
792 def f():
793 try:
794 # Intra-buffer read then buffer-flushing read
795 for n in cycle([1, 19]):
796 s = bufio.read(n)
797 if not s:
798 break
799 # list.append() is atomic
800 results.append(s)
801 except Exception as e:
802 errors.append(e)
803 raise
804 threads = [threading.Thread(target=f) for x in range(20)]
805 for t in threads:
806 t.start()
807 time.sleep(0.02) # yield
808 for t in threads:
809 t.join()
810 self.assertFalse(errors,
811 "the following exceptions were caught: %r" % errors)
812 s = b''.join(results)
813 for i in range(256):
814 c = bytes(bytearray([i]))
815 self.assertEqual(s.count(c), N)
816 finally:
817 support.unlink(support.TESTFN)
818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819 def test_misbehaved_io(self):
820 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
821 bufio = self.tp(rawio)
822 self.assertRaises(IOError, bufio.seek, 0)
823 self.assertRaises(IOError, bufio.tell)
824
825class CBufferedReaderTest(BufferedReaderTest):
826 tp = io.BufferedReader
827
828 def test_constructor(self):
829 BufferedReaderTest.test_constructor(self)
830 # The allocation can succeed on 32-bit builds, e.g. with more
831 # than 2GB RAM and a 64-bit kernel.
832 if sys.maxsize > 0x7FFFFFFF:
833 rawio = self.MockRawIO()
834 bufio = self.tp(rawio)
835 self.assertRaises((OverflowError, MemoryError, ValueError),
836 bufio.__init__, rawio, sys.maxsize)
837
838 def test_initialization(self):
839 rawio = self.MockRawIO([b"abc"])
840 bufio = self.tp(rawio)
841 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
842 self.assertRaises(ValueError, bufio.read)
843 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
844 self.assertRaises(ValueError, bufio.read)
845 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
846 self.assertRaises(ValueError, bufio.read)
847
848 def test_misbehaved_io_read(self):
849 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
850 bufio = self.tp(rawio)
851 # _pyio.BufferedReader seems to implement reading different, so that
852 # checking this is not so easy.
853 self.assertRaises(IOError, bufio.read, 10)
854
855 def test_garbage_collection(self):
856 # C BufferedReader objects are collected.
857 # The Python version has __del__, so it ends into gc.garbage instead
858 rawio = self.FileIO(support.TESTFN, "w+b")
859 f = self.tp(rawio)
860 f.f = f
861 wr = weakref.ref(f)
862 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000863 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +0000864 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000865
866class PyBufferedReaderTest(BufferedReaderTest):
867 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000868
Guido van Rossuma9e20242007-03-08 00:43:48 +0000869
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000870class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
871 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 def test_constructor(self):
874 rawio = self.MockRawIO()
875 bufio = self.tp(rawio)
876 bufio.__init__(rawio)
877 bufio.__init__(rawio, buffer_size=1024)
878 bufio.__init__(rawio, buffer_size=16)
879 self.assertEquals(3, bufio.write(b"abc"))
880 bufio.flush()
881 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
882 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
883 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
884 bufio.__init__(rawio)
885 self.assertEquals(3, bufio.write(b"ghi"))
886 bufio.flush()
887 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
888
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000889 def test_detach_flush(self):
890 raw = self.MockRawIO()
891 buf = self.tp(raw)
892 buf.write(b"howdy!")
893 self.assertFalse(raw._write_stack)
894 buf.detach()
895 self.assertEqual(raw._write_stack, [b"howdy!"])
896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000897 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000898 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 writer = self.MockRawIO()
900 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000901 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000902 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000904 def test_write_overflow(self):
905 writer = self.MockRawIO()
906 bufio = self.tp(writer, 8)
907 contents = b"abcdefghijklmnop"
908 for n in range(0, len(contents), 3):
909 bufio.write(contents[n:n+3])
910 flushed = b"".join(writer._write_stack)
911 # At least (total - 8) bytes were implicitly flushed, perhaps more
912 # depending on the implementation.
Georg Brandlab91fde2009-08-13 08:51:18 +0000913 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000914
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000915 def check_writes(self, intermediate_func):
916 # Lots of writes, test the flushed output is as expected.
917 contents = bytes(range(256)) * 1000
918 n = 0
919 writer = self.MockRawIO()
920 bufio = self.tp(writer, 13)
921 # Generator of write sizes: repeat each N 15 times then proceed to N+1
922 def gen_sizes():
923 for size in count(1):
924 for i in range(15):
925 yield size
926 sizes = gen_sizes()
927 while n < len(contents):
928 size = min(next(sizes), len(contents) - n)
929 self.assertEquals(bufio.write(contents[n:n+size]), size)
930 intermediate_func(bufio)
931 n += size
932 bufio.flush()
933 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000935 def test_writes(self):
936 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 def test_writes_and_flushes(self):
939 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000941 def test_writes_and_seeks(self):
942 def _seekabs(bufio):
943 pos = bufio.tell()
944 bufio.seek(pos + 1, 0)
945 bufio.seek(pos - 1, 0)
946 bufio.seek(pos, 0)
947 self.check_writes(_seekabs)
948 def _seekrel(bufio):
949 pos = bufio.seek(0, 1)
950 bufio.seek(+1, 1)
951 bufio.seek(-1, 1)
952 bufio.seek(pos, 0)
953 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 def test_writes_and_truncates(self):
956 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000958 def test_write_non_blocking(self):
959 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000960 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 self.assertEquals(bufio.write(b"abcd"), 4)
963 self.assertEquals(bufio.write(b"efghi"), 5)
964 # 1 byte will be written, the rest will be buffered
965 raw.block_on(b"k")
966 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000967
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000968 # 8 bytes will be written, 8 will be buffered and the rest will be lost
969 raw.block_on(b"0")
970 try:
971 bufio.write(b"opqrwxyz0123456789")
972 except self.BlockingIOError as e:
973 written = e.characters_written
974 else:
975 self.fail("BlockingIOError should have been raised")
976 self.assertEquals(written, 16)
977 self.assertEquals(raw.pop_written(),
978 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000979
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000980 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
981 s = raw.pop_written()
982 # Previously buffered bytes were flushed
983 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000985 def test_write_and_rewind(self):
986 raw = io.BytesIO()
987 bufio = self.tp(raw, 4)
988 self.assertEqual(bufio.write(b"abcdef"), 6)
989 self.assertEqual(bufio.tell(), 6)
990 bufio.seek(0, 0)
991 self.assertEqual(bufio.write(b"XY"), 2)
992 bufio.seek(6, 0)
993 self.assertEqual(raw.getvalue(), b"XYcdef")
994 self.assertEqual(bufio.write(b"123456"), 6)
995 bufio.flush()
996 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000997
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000998 def test_flush(self):
999 writer = self.MockRawIO()
1000 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001001 bufio.write(b"abc")
1002 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001003 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001004
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001005 def test_destructor(self):
1006 writer = self.MockRawIO()
1007 bufio = self.tp(writer, 8)
1008 bufio.write(b"abc")
1009 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001010 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 self.assertEquals(b"abc", writer._write_stack[0])
1012
1013 def test_truncate(self):
1014 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001015 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 bufio = self.tp(raw, 8)
1017 bufio.write(b"abcdef")
1018 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001019 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001020 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001021 self.assertEqual(f.read(), b"abc")
1022
1023 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001024 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001025 # Write out many bytes from many threads and test they were
1026 # all flushed.
1027 N = 1000
1028 contents = bytes(range(256)) * N
1029 sizes = cycle([1, 19])
1030 n = 0
1031 queue = deque()
1032 while n < len(contents):
1033 size = next(sizes)
1034 queue.append(contents[n:n+size])
1035 n += size
1036 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001037 # We use a real file object because it allows us to
1038 # exercise situations where the GIL is released before
1039 # writing the buffer to the raw streams. This is in addition
1040 # to concurrency issues due to switching threads in the middle
1041 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001042 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001044 errors = []
1045 def f():
1046 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001047 while True:
1048 try:
1049 s = queue.popleft()
1050 except IndexError:
1051 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001052 bufio.write(s)
1053 except Exception as e:
1054 errors.append(e)
1055 raise
1056 threads = [threading.Thread(target=f) for x in range(20)]
1057 for t in threads:
1058 t.start()
1059 time.sleep(0.02) # yield
1060 for t in threads:
1061 t.join()
1062 self.assertFalse(errors,
1063 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001065 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 s = f.read()
1067 for i in range(256):
1068 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001069 finally:
1070 support.unlink(support.TESTFN)
1071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_misbehaved_io(self):
1073 rawio = self.MisbehavedRawIO()
1074 bufio = self.tp(rawio, 5)
1075 self.assertRaises(IOError, bufio.seek, 0)
1076 self.assertRaises(IOError, bufio.tell)
1077 self.assertRaises(IOError, bufio.write, b"abcdef")
1078
Benjamin Peterson59406a92009-03-26 17:10:29 +00001079 def test_max_buffer_size_deprecation(self):
1080 with support.check_warnings() as w:
1081 warnings.simplefilter("always", DeprecationWarning)
1082 self.tp(self.MockRawIO(), 8, 12)
1083 self.assertEqual(len(w.warnings), 1)
1084 warning = w.warnings[0]
1085 self.assertTrue(warning.category is DeprecationWarning)
1086 self.assertEqual(str(warning.message),
1087 "max_buffer_size is deprecated")
1088
1089
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090class CBufferedWriterTest(BufferedWriterTest):
1091 tp = io.BufferedWriter
1092
1093 def test_constructor(self):
1094 BufferedWriterTest.test_constructor(self)
1095 # The allocation can succeed on 32-bit builds, e.g. with more
1096 # than 2GB RAM and a 64-bit kernel.
1097 if sys.maxsize > 0x7FFFFFFF:
1098 rawio = self.MockRawIO()
1099 bufio = self.tp(rawio)
1100 self.assertRaises((OverflowError, MemoryError, ValueError),
1101 bufio.__init__, rawio, sys.maxsize)
1102
1103 def test_initialization(self):
1104 rawio = self.MockRawIO()
1105 bufio = self.tp(rawio)
1106 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1107 self.assertRaises(ValueError, bufio.write, b"def")
1108 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1109 self.assertRaises(ValueError, bufio.write, b"def")
1110 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1111 self.assertRaises(ValueError, bufio.write, b"def")
1112
1113 def test_garbage_collection(self):
1114 # C BufferedWriter objects are collected, and collecting them flushes
1115 # all data to disk.
1116 # The Python version has __del__, so it ends into gc.garbage instead
1117 rawio = self.FileIO(support.TESTFN, "w+b")
1118 f = self.tp(rawio)
1119 f.write(b"123xxx")
1120 f.x = f
1121 wr = weakref.ref(f)
1122 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001123 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00001124 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001125 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001126 self.assertEqual(f.read(), b"123xxx")
1127
1128
1129class PyBufferedWriterTest(BufferedWriterTest):
1130 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001131
Guido van Rossum01a27522007-03-07 01:00:12 +00001132class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001133
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001134 def test_constructor(self):
1135 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001136 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001137
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001138 def test_detach(self):
1139 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1140 self.assertRaises(self.UnsupportedOperation, pair.detach)
1141
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001142 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001143 with support.check_warnings() as w:
1144 warnings.simplefilter("always", DeprecationWarning)
1145 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1146 self.assertEqual(len(w.warnings), 1)
1147 warning = w.warnings[0]
1148 self.assertTrue(warning.category is DeprecationWarning)
1149 self.assertEqual(str(warning.message),
1150 "max_buffer_size is deprecated")
1151
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001152 def test_constructor_with_not_readable(self):
1153 class NotReadable(MockRawIO):
1154 def readable(self):
1155 return False
1156
1157 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1158
1159 def test_constructor_with_not_writeable(self):
1160 class NotWriteable(MockRawIO):
1161 def writable(self):
1162 return False
1163
1164 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1165
1166 def test_read(self):
1167 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1168
1169 self.assertEqual(pair.read(3), b"abc")
1170 self.assertEqual(pair.read(1), b"d")
1171 self.assertEqual(pair.read(), b"ef")
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001172 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1173 self.assertEqual(pair.read(None), b"abc")
1174
1175 def test_readlines(self):
1176 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1177 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1178 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1179 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001180
1181 def test_read1(self):
1182 # .read1() is delegated to the underlying reader object, so this test
1183 # can be shallow.
1184 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1185
1186 self.assertEqual(pair.read1(3), b"abc")
1187
1188 def test_readinto(self):
1189 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1190
1191 data = bytearray(5)
1192 self.assertEqual(pair.readinto(data), 5)
1193 self.assertEqual(data, b"abcde")
1194
1195 def test_write(self):
1196 w = self.MockRawIO()
1197 pair = self.tp(self.MockRawIO(), w)
1198
1199 pair.write(b"abc")
1200 pair.flush()
1201 pair.write(b"def")
1202 pair.flush()
1203 self.assertEqual(w._write_stack, [b"abc", b"def"])
1204
1205 def test_peek(self):
1206 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1207
1208 self.assertTrue(pair.peek(3).startswith(b"abc"))
1209 self.assertEqual(pair.read(3), b"abc")
1210
1211 def test_readable(self):
1212 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1213 self.assertTrue(pair.readable())
1214
1215 def test_writeable(self):
1216 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1217 self.assertTrue(pair.writable())
1218
1219 def test_seekable(self):
1220 # BufferedRWPairs are never seekable, even if their readers and writers
1221 # are.
1222 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1223 self.assertFalse(pair.seekable())
1224
1225 # .flush() is delegated to the underlying writer object and has been
1226 # tested in the test_write method.
1227
1228 def test_close_and_closed(self):
1229 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1230 self.assertFalse(pair.closed)
1231 pair.close()
1232 self.assertTrue(pair.closed)
1233
1234 def test_isatty(self):
1235 class SelectableIsAtty(MockRawIO):
1236 def __init__(self, isatty):
1237 MockRawIO.__init__(self)
1238 self._isatty = isatty
1239
1240 def isatty(self):
1241 return self._isatty
1242
1243 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1244 self.assertFalse(pair.isatty())
1245
1246 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1247 self.assertTrue(pair.isatty())
1248
1249 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1250 self.assertTrue(pair.isatty())
1251
1252 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1253 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001254
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001255class CBufferedRWPairTest(BufferedRWPairTest):
1256 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001257
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258class PyBufferedRWPairTest(BufferedRWPairTest):
1259 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261
1262class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1263 read_mode = "rb+"
1264 write_mode = "wb+"
1265
1266 def test_constructor(self):
1267 BufferedReaderTest.test_constructor(self)
1268 BufferedWriterTest.test_constructor(self)
1269
1270 def test_read_and_write(self):
1271 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001272 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001273
1274 self.assertEqual(b"as", rw.read(2))
1275 rw.write(b"ddd")
1276 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001277 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001279 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001281 def test_seek_and_tell(self):
1282 raw = self.BytesIO(b"asdfghjkl")
1283 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001284
1285 self.assertEquals(b"as", rw.read(2))
1286 self.assertEquals(2, rw.tell())
1287 rw.seek(0, 0)
1288 self.assertEquals(b"asdf", rw.read(4))
1289
1290 rw.write(b"asdf")
1291 rw.seek(0, 0)
1292 self.assertEquals(b"asdfasdfl", rw.read())
1293 self.assertEquals(9, rw.tell())
1294 rw.seek(-4, 2)
1295 self.assertEquals(5, rw.tell())
1296 rw.seek(2, 1)
1297 self.assertEquals(7, rw.tell())
1298 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001299 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001300
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001301 def check_flush_and_read(self, read_func):
1302 raw = self.BytesIO(b"abcdefghi")
1303 bufio = self.tp(raw)
1304
1305 self.assertEquals(b"ab", read_func(bufio, 2))
1306 bufio.write(b"12")
1307 self.assertEquals(b"ef", read_func(bufio, 2))
1308 self.assertEquals(6, bufio.tell())
1309 bufio.flush()
1310 self.assertEquals(6, bufio.tell())
1311 self.assertEquals(b"ghi", read_func(bufio))
1312 raw.seek(0, 0)
1313 raw.write(b"XYZ")
1314 # flush() resets the read buffer
1315 bufio.flush()
1316 bufio.seek(0, 0)
1317 self.assertEquals(b"XYZ", read_func(bufio, 3))
1318
1319 def test_flush_and_read(self):
1320 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1321
1322 def test_flush_and_readinto(self):
1323 def _readinto(bufio, n=-1):
1324 b = bytearray(n if n >= 0 else 9999)
1325 n = bufio.readinto(b)
1326 return bytes(b[:n])
1327 self.check_flush_and_read(_readinto)
1328
1329 def test_flush_and_peek(self):
1330 def _peek(bufio, n=-1):
1331 # This relies on the fact that the buffer can contain the whole
1332 # raw stream, otherwise peek() can return less.
1333 b = bufio.peek(n)
1334 if n != -1:
1335 b = b[:n]
1336 bufio.seek(len(b), 1)
1337 return b
1338 self.check_flush_and_read(_peek)
1339
1340 def test_flush_and_write(self):
1341 raw = self.BytesIO(b"abcdefghi")
1342 bufio = self.tp(raw)
1343
1344 bufio.write(b"123")
1345 bufio.flush()
1346 bufio.write(b"45")
1347 bufio.flush()
1348 bufio.seek(0, 0)
1349 self.assertEquals(b"12345fghi", raw.getvalue())
1350 self.assertEquals(b"12345fghi", bufio.read())
1351
1352 def test_threads(self):
1353 BufferedReaderTest.test_threads(self)
1354 BufferedWriterTest.test_threads(self)
1355
1356 def test_writes_and_peek(self):
1357 def _peek(bufio):
1358 bufio.peek(1)
1359 self.check_writes(_peek)
1360 def _peek(bufio):
1361 pos = bufio.tell()
1362 bufio.seek(-1, 1)
1363 bufio.peek(1)
1364 bufio.seek(pos, 0)
1365 self.check_writes(_peek)
1366
1367 def test_writes_and_reads(self):
1368 def _read(bufio):
1369 bufio.seek(-1, 1)
1370 bufio.read(1)
1371 self.check_writes(_read)
1372
1373 def test_writes_and_read1s(self):
1374 def _read1(bufio):
1375 bufio.seek(-1, 1)
1376 bufio.read1(1)
1377 self.check_writes(_read1)
1378
1379 def test_writes_and_readintos(self):
1380 def _read(bufio):
1381 bufio.seek(-1, 1)
1382 bufio.readinto(bytearray(1))
1383 self.check_writes(_read)
1384
Antoine Pitrou0473e562009-08-06 20:52:43 +00001385 def test_write_after_readahead(self):
1386 # Issue #6629: writing after the buffer was filled by readahead should
1387 # first rewind the raw stream.
1388 for overwrite_size in [1, 5]:
1389 raw = self.BytesIO(b"A" * 10)
1390 bufio = self.tp(raw, 4)
1391 # Trigger readahead
1392 self.assertEqual(bufio.read(1), b"A")
1393 self.assertEqual(bufio.tell(), 1)
1394 # Overwriting should rewind the raw stream if it needs so
1395 bufio.write(b"B" * overwrite_size)
1396 self.assertEqual(bufio.tell(), overwrite_size + 1)
1397 # If the write size was smaller than the buffer size, flush() and
1398 # check that rewind happens.
1399 bufio.flush()
1400 self.assertEqual(bufio.tell(), overwrite_size + 1)
1401 s = raw.getvalue()
1402 self.assertEqual(s,
1403 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1404
Antoine Pitrou66f9fea2010-01-31 23:20:26 +00001405 def test_truncate_after_read_or_write(self):
1406 raw = self.BytesIO(b"A" * 10)
1407 bufio = self.tp(raw, 100)
1408 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1409 self.assertEqual(bufio.truncate(), 2)
1410 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1411 self.assertEqual(bufio.truncate(), 4)
1412
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001413 def test_misbehaved_io(self):
1414 BufferedReaderTest.test_misbehaved_io(self)
1415 BufferedWriterTest.test_misbehaved_io(self)
1416
1417class CBufferedRandomTest(BufferedRandomTest):
1418 tp = io.BufferedRandom
1419
1420 def test_constructor(self):
1421 BufferedRandomTest.test_constructor(self)
1422 # The allocation can succeed on 32-bit builds, e.g. with more
1423 # than 2GB RAM and a 64-bit kernel.
1424 if sys.maxsize > 0x7FFFFFFF:
1425 rawio = self.MockRawIO()
1426 bufio = self.tp(rawio)
1427 self.assertRaises((OverflowError, MemoryError, ValueError),
1428 bufio.__init__, rawio, sys.maxsize)
1429
1430 def test_garbage_collection(self):
1431 CBufferedReaderTest.test_garbage_collection(self)
1432 CBufferedWriterTest.test_garbage_collection(self)
1433
1434class PyBufferedRandomTest(BufferedRandomTest):
1435 tp = pyio.BufferedRandom
1436
1437
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001438# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1439# properties:
1440# - A single output character can correspond to many bytes of input.
1441# - The number of input bytes to complete the character can be
1442# undetermined until the last input byte is received.
1443# - The number of input bytes can vary depending on previous input.
1444# - A single input byte can correspond to many characters of output.
1445# - The number of output characters can be undetermined until the
1446# last input byte is received.
1447# - The number of output characters can vary depending on previous input.
1448
1449class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1450 """
1451 For testing seek/tell behavior with a stateful, buffering decoder.
1452
1453 Input is a sequence of words. Words may be fixed-length (length set
1454 by input) or variable-length (period-terminated). In variable-length
1455 mode, extra periods are ignored. Possible words are:
1456 - 'i' followed by a number sets the input length, I (maximum 99).
1457 When I is set to 0, words are space-terminated.
1458 - 'o' followed by a number sets the output length, O (maximum 99).
1459 - Any other word is converted into a word followed by a period on
1460 the output. The output word consists of the input word truncated
1461 or padded out with hyphens to make its length equal to O. If O
1462 is 0, the word is output verbatim without truncating or padding.
1463 I and O are initially set to 1. When I changes, any buffered input is
1464 re-scanned according to the new I. EOF also terminates the last word.
1465 """
1466
1467 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001468 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001469 self.reset()
1470
1471 def __repr__(self):
1472 return '<SID %x>' % id(self)
1473
1474 def reset(self):
1475 self.i = 1
1476 self.o = 1
1477 self.buffer = bytearray()
1478
1479 def getstate(self):
1480 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1481 return bytes(self.buffer), i*100 + o
1482
1483 def setstate(self, state):
1484 buffer, io = state
1485 self.buffer = bytearray(buffer)
1486 i, o = divmod(io, 100)
1487 self.i, self.o = i ^ 1, o ^ 1
1488
1489 def decode(self, input, final=False):
1490 output = ''
1491 for b in input:
1492 if self.i == 0: # variable-length, terminated with period
1493 if b == ord('.'):
1494 if self.buffer:
1495 output += self.process_word()
1496 else:
1497 self.buffer.append(b)
1498 else: # fixed-length, terminate after self.i bytes
1499 self.buffer.append(b)
1500 if len(self.buffer) == self.i:
1501 output += self.process_word()
1502 if final and self.buffer: # EOF terminates the last word
1503 output += self.process_word()
1504 return output
1505
1506 def process_word(self):
1507 output = ''
1508 if self.buffer[0] == ord('i'):
1509 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1510 elif self.buffer[0] == ord('o'):
1511 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1512 else:
1513 output = self.buffer.decode('ascii')
1514 if len(output) < self.o:
1515 output += '-'*self.o # pad out with hyphens
1516 if self.o:
1517 output = output[:self.o] # truncate to output length
1518 output += '.'
1519 self.buffer = bytearray()
1520 return output
1521
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001522 codecEnabled = False
1523
1524 @classmethod
1525 def lookupTestDecoder(cls, name):
1526 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001527 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001528 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001529 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001530 incrementalencoder=None,
1531 streamreader=None, streamwriter=None,
1532 incrementaldecoder=cls)
1533
1534# Register the previous decoder for testing.
1535# Disabled by default, tests will enable it.
1536codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1537
1538
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001539class StatefulIncrementalDecoderTest(unittest.TestCase):
1540 """
1541 Make sure the StatefulIncrementalDecoder actually works.
1542 """
1543
1544 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001545 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001546 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001547 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001548 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001549 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001550 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001551 # I=0, O=6 (variable-length input, fixed-length output)
1552 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1553 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001554 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001555 # I=6, O=3 (fixed-length input > fixed-length output)
1556 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1557 # I=0, then 3; O=29, then 15 (with longer output)
1558 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1559 'a----------------------------.' +
1560 'b----------------------------.' +
1561 'cde--------------------------.' +
1562 'abcdefghijabcde.' +
1563 'a.b------------.' +
1564 '.c.------------.' +
1565 'd.e------------.' +
1566 'k--------------.' +
1567 'l--------------.' +
1568 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001569 ]
1570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001572 # Try a few one-shot test cases.
1573 for input, eof, output in self.test_cases:
1574 d = StatefulIncrementalDecoder()
1575 self.assertEquals(d.decode(input, eof), output)
1576
1577 # Also test an unfinished decode, followed by forcing EOF.
1578 d = StatefulIncrementalDecoder()
1579 self.assertEquals(d.decode(b'oiabcd'), '')
1580 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001581
1582class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001583
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001584 def setUp(self):
1585 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1586 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001587 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001588
Guido van Rossumd0712812007-04-11 16:32:43 +00001589 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001590 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001591
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001592 def test_constructor(self):
1593 r = self.BytesIO(b"\xc3\xa9\n\n")
1594 b = self.BufferedReader(r, 1000)
1595 t = self.TextIOWrapper(b)
1596 t.__init__(b, encoding="latin1", newline="\r\n")
1597 self.assertEquals(t.encoding, "latin1")
1598 self.assertEquals(t.line_buffering, False)
1599 t.__init__(b, encoding="utf8", line_buffering=True)
1600 self.assertEquals(t.encoding, "utf8")
1601 self.assertEquals(t.line_buffering, True)
1602 self.assertEquals("\xe9\n", t.readline())
1603 self.assertRaises(TypeError, t.__init__, b, newline=42)
1604 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1605
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001606 def test_detach(self):
1607 r = self.BytesIO()
1608 b = self.BufferedWriter(r)
1609 t = self.TextIOWrapper(b)
1610 self.assertIs(t.detach(), b)
1611
1612 t = self.TextIOWrapper(b, encoding="ascii")
1613 t.write("howdy")
1614 self.assertFalse(r.getvalue())
1615 t.detach()
1616 self.assertEqual(r.getvalue(), b"howdy")
1617 self.assertRaises(ValueError, t.detach)
1618
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001619 def test_repr(self):
1620 raw = self.BytesIO("hello".encode("utf-8"))
1621 b = self.BufferedReader(raw)
1622 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001623 modname = self.TextIOWrapper.__module__
1624 self.assertEqual(repr(t),
1625 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1626 raw.name = "dummy"
1627 self.assertEqual(repr(t),
1628 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1629 raw.name = b"dummy"
1630 self.assertEqual(repr(t),
1631 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 def test_line_buffering(self):
1634 r = self.BytesIO()
1635 b = self.BufferedWriter(r, 1000)
1636 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001637 t.write("X")
1638 self.assertEquals(r.getvalue(), b"") # No flush happened
1639 t.write("Y\nZ")
1640 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1641 t.write("A\rB")
1642 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 def test_encoding(self):
1645 # Check the encoding attribute is always set, and valid
1646 b = self.BytesIO()
1647 t = self.TextIOWrapper(b, encoding="utf8")
1648 self.assertEqual(t.encoding, "utf8")
1649 t = self.TextIOWrapper(b)
Georg Brandlab91fde2009-08-13 08:51:18 +00001650 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 codecs.lookup(t.encoding)
1652
1653 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001654 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 b = self.BytesIO(b"abc\n\xff\n")
1656 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001657 self.assertRaises(UnicodeError, t.read)
1658 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001659 b = self.BytesIO(b"abc\n\xff\n")
1660 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001661 self.assertRaises(UnicodeError, t.read)
1662 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 b = self.BytesIO(b"abc\n\xff\n")
1664 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001665 self.assertEquals(t.read(), "abc\n\n")
1666 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001667 b = self.BytesIO(b"abc\n\xff\n")
1668 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001669 self.assertEquals(t.read(), "abc\n\ufffd\n")
1670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001672 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 b = self.BytesIO()
1674 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001675 self.assertRaises(UnicodeError, t.write, "\xff")
1676 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 b = self.BytesIO()
1678 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001679 self.assertRaises(UnicodeError, t.write, "\xff")
1680 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 b = self.BytesIO()
1682 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001683 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001684 t.write("abc\xffdef\n")
1685 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001686 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001687 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 b = self.BytesIO()
1689 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001690 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001691 t.write("abc\xffdef\n")
1692 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001693 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001696 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1697
1698 tests = [
1699 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001700 [ '', input_lines ],
1701 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1702 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1703 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001704 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001705 encodings = (
1706 'utf-8', 'latin-1',
1707 'utf-16', 'utf-16-le', 'utf-16-be',
1708 'utf-32', 'utf-32-le', 'utf-32-be',
1709 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001710
Guido van Rossum8358db22007-08-18 21:39:55 +00001711 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001712 # character in TextIOWrapper._pending_line.
1713 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001714 # XXX: str.encode() should return bytes
1715 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001716 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001717 for bufsize in range(1, 10):
1718 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001719 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1720 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001721 encoding=encoding)
1722 if do_reads:
1723 got_lines = []
1724 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001725 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001726 if c2 == '':
1727 break
1728 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001729 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001730 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001731 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001732
1733 for got_line, exp_line in zip(got_lines, exp_lines):
1734 self.assertEquals(got_line, exp_line)
1735 self.assertEquals(len(got_lines), len(exp_lines))
1736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 def test_newlines_input(self):
1738 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001739 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1740 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001741 (None, normalized.decode("ascii").splitlines(True)),
1742 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1744 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1745 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001746 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 buf = self.BytesIO(testdata)
1748 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001749 self.assertEquals(txt.readlines(), expected)
1750 txt.seek(0)
1751 self.assertEquals(txt.read(), "".join(expected))
1752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 def test_newlines_output(self):
1754 testdict = {
1755 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1756 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1757 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1758 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1759 }
1760 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1761 for newline, expected in tests:
1762 buf = self.BytesIO()
1763 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1764 txt.write("AAA\nB")
1765 txt.write("BB\nCCC\n")
1766 txt.write("X\rY\r\nZ")
1767 txt.flush()
1768 self.assertEquals(buf.closed, False)
1769 self.assertEquals(buf.getvalue(), expected)
1770
1771 def test_destructor(self):
1772 l = []
1773 base = self.BytesIO
1774 class MyBytesIO(base):
1775 def close(self):
1776 l.append(self.getvalue())
1777 base.close(self)
1778 b = MyBytesIO()
1779 t = self.TextIOWrapper(b, encoding="ascii")
1780 t.write("abc")
1781 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001782 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001783 self.assertEquals([b"abc"], l)
1784
1785 def test_override_destructor(self):
1786 record = []
1787 class MyTextIO(self.TextIOWrapper):
1788 def __del__(self):
1789 record.append(1)
1790 try:
1791 f = super().__del__
1792 except AttributeError:
1793 pass
1794 else:
1795 f()
1796 def close(self):
1797 record.append(2)
1798 super().close()
1799 def flush(self):
1800 record.append(3)
1801 super().flush()
1802 b = self.BytesIO()
1803 t = MyTextIO(b, encoding="ascii")
1804 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001805 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 self.assertEqual(record, [1, 2, 3])
1807
1808 def test_error_through_destructor(self):
1809 # Test that the exception state is not modified by a destructor,
1810 # even if close() fails.
1811 rawio = self.CloseFailureIO()
1812 def f():
1813 self.TextIOWrapper(rawio).xyzzy
1814 with support.captured_output("stderr") as s:
1815 self.assertRaises(AttributeError, f)
1816 s = s.getvalue().strip()
1817 if s:
1818 # The destructor *may* have printed an unraisable error, check it
1819 self.assertEqual(len(s.splitlines()), 1)
Georg Brandlab91fde2009-08-13 08:51:18 +00001820 self.assertTrue(s.startswith("Exception IOError: "), s)
1821 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001822
Guido van Rossum9b76da62007-04-11 01:09:03 +00001823 # Systematic tests of the text I/O API
1824
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001826 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1827 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001829 f._CHUNK_SIZE = chunksize
1830 self.assertEquals(f.write("abc"), 3)
1831 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001833 f._CHUNK_SIZE = chunksize
1834 self.assertEquals(f.tell(), 0)
1835 self.assertEquals(f.read(), "abc")
1836 cookie = f.tell()
1837 self.assertEquals(f.seek(0), 0)
Benjamin Peterson6b59f772009-12-13 19:30:15 +00001838 self.assertEquals(f.read(None), "abc")
1839 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001840 self.assertEquals(f.read(2), "ab")
1841 self.assertEquals(f.read(1), "c")
1842 self.assertEquals(f.read(1), "")
1843 self.assertEquals(f.read(), "")
1844 self.assertEquals(f.tell(), cookie)
1845 self.assertEquals(f.seek(0), 0)
1846 self.assertEquals(f.seek(0, 2), cookie)
1847 self.assertEquals(f.write("def"), 3)
1848 self.assertEquals(f.seek(cookie), cookie)
1849 self.assertEquals(f.read(), "def")
1850 if enc.startswith("utf"):
1851 self.multi_line_test(f, enc)
1852 f.close()
1853
1854 def multi_line_test(self, f, enc):
1855 f.seek(0)
1856 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001857 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001858 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001859 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001860 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001861 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001862 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001863 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001864 wlines.append((f.tell(), line))
1865 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001866 f.seek(0)
1867 rlines = []
1868 while True:
1869 pos = f.tell()
1870 line = f.readline()
1871 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001872 break
1873 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001874 self.assertEquals(rlines, wlines)
1875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 def test_telling(self):
1877 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001878 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001879 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001880 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001881 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001882 p2 = f.tell()
1883 f.seek(0)
1884 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001885 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001886 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001887 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001888 self.assertEquals(f.tell(), p2)
1889 f.seek(0)
1890 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001891 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001892 self.assertRaises(IOError, f.tell)
1893 self.assertEquals(f.tell(), p2)
1894 f.close()
1895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 def test_seeking(self):
1897 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001898 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001899 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001900 prefix = bytes(u_prefix.encode("utf-8"))
1901 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001902 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001903 suffix = bytes(u_suffix.encode("utf-8"))
1904 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001905 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001906 f.write(line*2)
1907 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001908 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001909 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001910 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001911 self.assertEquals(f.tell(), prefix_size)
1912 self.assertEquals(f.readline(), u_suffix)
1913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001915 # Regression test for a specific bug
1916 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001918 f.write(data)
1919 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001921 f._CHUNK_SIZE # Just test that it exists
1922 f._CHUNK_SIZE = 2
1923 f.readline()
1924 f.tell()
1925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926 def test_seek_and_tell(self):
1927 #Test seek/tell using the StatefulIncrementalDecoder.
1928 # Make test faster by doing smaller seeks
1929 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001930
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001931 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001932 """Tell/seek to various points within a data stream and ensure
1933 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001934 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001935 f.write(data)
1936 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001937 f = self.open(support.TESTFN, encoding='test_decoder')
1938 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001939 decoded = f.read()
1940 f.close()
1941
Neal Norwitze2b07052008-03-18 19:52:05 +00001942 for i in range(min_pos, len(decoded) + 1): # seek positions
1943 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001944 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001945 self.assertEquals(f.read(i), decoded[:i])
1946 cookie = f.tell()
1947 self.assertEquals(f.read(j), decoded[i:i + j])
1948 f.seek(cookie)
1949 self.assertEquals(f.read(), decoded[i:])
1950 f.close()
1951
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001952 # Enable the test decoder.
1953 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001954
1955 # Run the tests.
1956 try:
1957 # Try each test case.
1958 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001959 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001960
1961 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001962 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1963 offset = CHUNK_SIZE - len(input)//2
1964 prefix = b'.'*offset
1965 # Don't bother seeking into the prefix (takes too long).
1966 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001967 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001968
1969 # Ensure our test decoder won't interfere with subsequent tests.
1970 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001971 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001973 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001974 data = "1234567890"
1975 tests = ("utf-16",
1976 "utf-16-le",
1977 "utf-16-be",
1978 "utf-32",
1979 "utf-32-le",
1980 "utf-32-be")
1981 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 buf = self.BytesIO()
1983 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001984 # Check if the BOM is written only once (see issue1753).
1985 f.write(data)
1986 f.write(data)
1987 f.seek(0)
1988 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001989 f.seek(0)
1990 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001991 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1992
Benjamin Petersona1b49012009-03-31 23:11:32 +00001993 def test_unreadable(self):
1994 class UnReadable(self.BytesIO):
1995 def readable(self):
1996 return False
1997 txt = self.TextIOWrapper(UnReadable())
1998 self.assertRaises(IOError, txt.read)
1999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002000 def test_read_one_by_one(self):
2001 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002002 reads = ""
2003 while True:
2004 c = txt.read(1)
2005 if not c:
2006 break
2007 reads += c
2008 self.assertEquals(reads, "AA\nBB")
2009
Benjamin Peterson6b59f772009-12-13 19:30:15 +00002010 def test_readlines(self):
2011 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2012 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2013 txt.seek(0)
2014 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2015 txt.seek(0)
2016 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2017
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002018 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002019 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002020 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002022 reads = ""
2023 while True:
2024 c = txt.read(128)
2025 if not c:
2026 break
2027 reads += c
2028 self.assertEquals(reads, "A"*127+"\nB")
2029
2030 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002031 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002032
2033 # read one char at a time
2034 reads = ""
2035 while True:
2036 c = txt.read(1)
2037 if not c:
2038 break
2039 reads += c
2040 self.assertEquals(reads, self.normalized)
2041
2042 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002043 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002044 txt._CHUNK_SIZE = 4
2045
2046 reads = ""
2047 while True:
2048 c = txt.read(4)
2049 if not c:
2050 break
2051 reads += c
2052 self.assertEquals(reads, self.normalized)
2053
2054 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002055 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002056 txt._CHUNK_SIZE = 4
2057
2058 reads = txt.read(4)
2059 reads += txt.read(4)
2060 reads += txt.readline()
2061 reads += txt.readline()
2062 reads += txt.readline()
2063 self.assertEquals(reads, self.normalized)
2064
2065 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002066 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002067 txt._CHUNK_SIZE = 4
2068
2069 reads = txt.read(4)
2070 reads += txt.read()
2071 self.assertEquals(reads, self.normalized)
2072
2073 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002074 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002075 txt._CHUNK_SIZE = 4
2076
2077 reads = txt.read(4)
2078 pos = txt.tell()
2079 txt.seek(0)
2080 txt.seek(pos)
2081 self.assertEquals(txt.read(4), "BBB\n")
2082
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002083 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002084 buffer = self.BytesIO(self.testdata)
2085 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002086
2087 self.assertEqual(buffer.seekable(), txt.seekable())
2088
Antoine Pitroue4501852009-05-14 18:55:55 +00002089 def test_append_bom(self):
2090 # The BOM is not written again when appending to a non-empty file
2091 filename = support.TESTFN
2092 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2093 with self.open(filename, 'w', encoding=charset) as f:
2094 f.write('aaa')
2095 pos = f.tell()
2096 with self.open(filename, 'rb') as f:
2097 self.assertEquals(f.read(), 'aaa'.encode(charset))
2098
2099 with self.open(filename, 'a', encoding=charset) as f:
2100 f.write('xxx')
2101 with self.open(filename, 'rb') as f:
2102 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2103
2104 def test_seek_bom(self):
2105 # Same test, but when seeking manually
2106 filename = support.TESTFN
2107 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2108 with self.open(filename, 'w', encoding=charset) as f:
2109 f.write('aaa')
2110 pos = f.tell()
2111 with self.open(filename, 'r+', encoding=charset) as f:
2112 f.seek(pos)
2113 f.write('zzz')
2114 f.seek(0)
2115 f.write('bbb')
2116 with self.open(filename, 'rb') as f:
2117 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2118
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002119 def test_errors_property(self):
2120 with self.open(support.TESTFN, "w") as f:
2121 self.assertEqual(f.errors, "strict")
2122 with self.open(support.TESTFN, "w", errors="replace") as f:
2123 self.assertEqual(f.errors, "replace")
2124
Antoine Pitroue4501852009-05-14 18:55:55 +00002125
Amaury Forgeot d'Arcaf0312a2009-08-29 23:19:16 +00002126 def test_threads_write(self):
2127 # Issue6750: concurrent writes could duplicate data
2128 event = threading.Event()
2129 with self.open(support.TESTFN, "w", buffering=1) as f:
2130 def run(n):
2131 text = "Thread%03d\n" % n
2132 event.wait()
2133 f.write(text)
2134 threads = [threading.Thread(target=lambda n=x: run(n))
2135 for x in range(20)]
2136 for t in threads:
2137 t.start()
2138 time.sleep(0.02)
2139 event.set()
2140 for t in threads:
2141 t.join()
2142 with self.open(support.TESTFN) as f:
2143 content = f.read()
2144 for n in range(20):
2145 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2146
Antoine Pitroufaf90072010-05-03 16:58:19 +00002147 def test_flush_error_on_close(self):
2148 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2149 def bad_flush():
2150 raise IOError()
2151 txt.flush = bad_flush
2152 self.assertRaises(IOError, txt.close) # exception not swallowed
2153
2154 def test_multi_close(self):
2155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2156 txt.close()
2157 txt.close()
2158 txt.close()
2159 self.assertRaises(ValueError, txt.flush)
2160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161class CTextIOWrapperTest(TextIOWrapperTest):
2162
2163 def test_initialization(self):
2164 r = self.BytesIO(b"\xc3\xa9\n\n")
2165 b = self.BufferedReader(r, 1000)
2166 t = self.TextIOWrapper(b)
2167 self.assertRaises(TypeError, t.__init__, b, newline=42)
2168 self.assertRaises(ValueError, t.read)
2169 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2170 self.assertRaises(ValueError, t.read)
2171
2172 def test_garbage_collection(self):
2173 # C TextIOWrapper objects are collected, and collecting them flushes
2174 # all data to disk.
2175 # The Python version has __del__, so it ends in gc.garbage instead.
2176 rawio = io.FileIO(support.TESTFN, "wb")
2177 b = self.BufferedWriter(rawio)
2178 t = self.TextIOWrapper(b, encoding="ascii")
2179 t.write("456def")
2180 t.x = t
2181 wr = weakref.ref(t)
2182 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002183 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002184 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002185 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 self.assertEqual(f.read(), b"456def")
2187
2188class PyTextIOWrapperTest(TextIOWrapperTest):
2189 pass
2190
2191
2192class IncrementalNewlineDecoderTest(unittest.TestCase):
2193
2194 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002195 # UTF-8 specific tests for a newline decoder
2196 def _check_decode(b, s, **kwargs):
2197 # We exercise getstate() / setstate() as well as decode()
2198 state = decoder.getstate()
2199 self.assertEquals(decoder.decode(b, **kwargs), s)
2200 decoder.setstate(state)
2201 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002202
Antoine Pitrou180a3362008-12-14 16:36:46 +00002203 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002204
Antoine Pitrou180a3362008-12-14 16:36:46 +00002205 _check_decode(b'\xe8', "")
2206 _check_decode(b'\xa2', "")
2207 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002208
Antoine Pitrou180a3362008-12-14 16:36:46 +00002209 _check_decode(b'\xe8', "")
2210 _check_decode(b'\xa2', "")
2211 _check_decode(b'\x88', "\u8888")
2212
2213 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002214 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2215
Antoine Pitrou180a3362008-12-14 16:36:46 +00002216 decoder.reset()
2217 _check_decode(b'\n', "\n")
2218 _check_decode(b'\r', "")
2219 _check_decode(b'', "\n", final=True)
2220 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002221
Antoine Pitrou180a3362008-12-14 16:36:46 +00002222 _check_decode(b'\r', "")
2223 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002224
Antoine Pitrou180a3362008-12-14 16:36:46 +00002225 _check_decode(b'\r\r\n', "\n\n")
2226 _check_decode(b'\r', "")
2227 _check_decode(b'\r', "\n")
2228 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002229
Antoine Pitrou180a3362008-12-14 16:36:46 +00002230 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2231 _check_decode(b'\xe8\xa2\x88', "\u8888")
2232 _check_decode(b'\n', "\n")
2233 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2234 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002237 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 if encoding is not None:
2239 encoder = codecs.getincrementalencoder(encoding)()
2240 def _decode_bytewise(s):
2241 # Decode one byte at a time
2242 for b in encoder.encode(s):
2243 result.append(decoder.decode(bytes([b])))
2244 else:
2245 encoder = None
2246 def _decode_bytewise(s):
2247 # Decode one char at a time
2248 for c in s:
2249 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002250 self.assertEquals(decoder.newlines, None)
2251 _decode_bytewise("abc\n\r")
2252 self.assertEquals(decoder.newlines, '\n')
2253 _decode_bytewise("\nabc")
2254 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2255 _decode_bytewise("abc\r")
2256 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2257 _decode_bytewise("abc")
2258 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2259 _decode_bytewise("abc\r")
2260 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2261 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 input = "abc"
2263 if encoder is not None:
2264 encoder.reset()
2265 input = encoder.encode(input)
2266 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002267 self.assertEquals(decoder.newlines, None)
2268
2269 def test_newline_decoder(self):
2270 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002271 # None meaning the IncrementalNewlineDecoder takes unicode input
2272 # rather than bytes input
2273 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002274 'utf-16', 'utf-16-le', 'utf-16-be',
2275 'utf-32', 'utf-32-le', 'utf-32-be',
2276 )
2277 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002278 decoder = enc and codecs.getincrementaldecoder(enc)()
2279 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2280 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002281 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2283 self.check_newline_decoding_utf8(decoder)
2284
Antoine Pitrou66913e22009-03-06 23:40:56 +00002285 def test_newline_bytes(self):
2286 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2287 def _check(dec):
2288 self.assertEquals(dec.newlines, None)
2289 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2290 self.assertEquals(dec.newlines, None)
2291 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2292 self.assertEquals(dec.newlines, None)
2293 dec = self.IncrementalNewlineDecoder(None, translate=False)
2294 _check(dec)
2295 dec = self.IncrementalNewlineDecoder(None, translate=True)
2296 _check(dec)
2297
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002298class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2299 pass
2300
2301class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2302 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002303
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002304
Guido van Rossum01a27522007-03-07 01:00:12 +00002305# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002306
Guido van Rossum5abbf752007-08-27 17:39:33 +00002307class MiscIOTest(unittest.TestCase):
2308
Barry Warsaw40e82462008-11-20 20:14:50 +00002309 def tearDown(self):
2310 support.unlink(support.TESTFN)
2311
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002312 def test___all__(self):
2313 for name in self.io.__all__:
2314 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002315 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002316 if name == "open":
2317 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002318 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002319 self.assertTrue(issubclass(obj, Exception), name)
2320 elif not name.startswith("SEEK_"):
2321 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002322
Barry Warsaw40e82462008-11-20 20:14:50 +00002323 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002325 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002326 f.close()
2327
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002328 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002329 self.assertEquals(f.name, support.TESTFN)
2330 self.assertEquals(f.buffer.name, support.TESTFN)
2331 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2332 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002333 self.assertEquals(f.buffer.mode, "rb")
2334 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002335 f.close()
2336
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002337 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002338 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002339 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2340 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002341
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002342 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002343 self.assertEquals(g.mode, "wb")
2344 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002345 self.assertEquals(g.name, f.fileno())
2346 self.assertEquals(g.raw.name, f.fileno())
2347 f.close()
2348 g.close()
2349
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002350 def test_io_after_close(self):
2351 for kwargs in [
2352 {"mode": "w"},
2353 {"mode": "wb"},
2354 {"mode": "w", "buffering": 1},
2355 {"mode": "w", "buffering": 2},
2356 {"mode": "wb", "buffering": 0},
2357 {"mode": "r"},
2358 {"mode": "rb"},
2359 {"mode": "r", "buffering": 1},
2360 {"mode": "r", "buffering": 2},
2361 {"mode": "rb", "buffering": 0},
2362 {"mode": "w+"},
2363 {"mode": "w+b"},
2364 {"mode": "w+", "buffering": 1},
2365 {"mode": "w+", "buffering": 2},
2366 {"mode": "w+b", "buffering": 0},
2367 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002368 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002369 f.close()
2370 self.assertRaises(ValueError, f.flush)
2371 self.assertRaises(ValueError, f.fileno)
2372 self.assertRaises(ValueError, f.isatty)
2373 self.assertRaises(ValueError, f.__iter__)
2374 if hasattr(f, "peek"):
2375 self.assertRaises(ValueError, f.peek, 1)
2376 self.assertRaises(ValueError, f.read)
2377 if hasattr(f, "read1"):
2378 self.assertRaises(ValueError, f.read1, 1024)
2379 if hasattr(f, "readinto"):
2380 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2381 self.assertRaises(ValueError, f.readline)
2382 self.assertRaises(ValueError, f.readlines)
2383 self.assertRaises(ValueError, f.seek, 0)
2384 self.assertRaises(ValueError, f.tell)
2385 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002386 self.assertRaises(ValueError, f.write,
2387 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002388 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002390
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002391 def test_blockingioerror(self):
2392 # Various BlockingIOError issues
2393 self.assertRaises(TypeError, self.BlockingIOError)
2394 self.assertRaises(TypeError, self.BlockingIOError, 1)
2395 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2396 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2397 b = self.BlockingIOError(1, "")
2398 self.assertEqual(b.characters_written, 0)
2399 class C(str):
2400 pass
2401 c = C("")
2402 b = self.BlockingIOError(1, c)
2403 c.b = b
2404 b.c = c
2405 wr = weakref.ref(c)
2406 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002407 support.gc_collect()
Georg Brandlab91fde2009-08-13 08:51:18 +00002408 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409
2410 def test_abcs(self):
2411 # Test the visible base classes are ABCs.
2412 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2413 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2414 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2415 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2416
2417 def _check_abc_inheritance(self, abcmodule):
2418 with self.open(support.TESTFN, "wb", buffering=0) as f:
2419 self.assertTrue(isinstance(f, abcmodule.IOBase))
2420 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2421 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2422 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2423 with self.open(support.TESTFN, "wb") as f:
2424 self.assertTrue(isinstance(f, abcmodule.IOBase))
2425 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2426 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2427 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2428 with self.open(support.TESTFN, "w") as f:
2429 self.assertTrue(isinstance(f, abcmodule.IOBase))
2430 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2431 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2432 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2433
2434 def test_abc_inheritance(self):
2435 # Test implementations inherit from their respective ABCs
2436 self._check_abc_inheritance(self)
2437
2438 def test_abc_inheritance_official(self):
2439 # Test implementations inherit from the official ABCs of the
2440 # baseline "io" module.
2441 self._check_abc_inheritance(io)
2442
2443class CMiscIOTest(MiscIOTest):
2444 io = io
2445
2446class PyMiscIOTest(MiscIOTest):
2447 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002448
Guido van Rossum28524c72007-02-27 05:47:44 +00002449def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002450 tests = (CIOTest, PyIOTest,
2451 CBufferedReaderTest, PyBufferedReaderTest,
2452 CBufferedWriterTest, PyBufferedWriterTest,
2453 CBufferedRWPairTest, PyBufferedRWPairTest,
2454 CBufferedRandomTest, PyBufferedRandomTest,
2455 StatefulIncrementalDecoderTest,
2456 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2457 CTextIOWrapperTest, PyTextIOWrapperTest,
2458 CMiscIOTest, PyMiscIOTest,)
2459
2460 # Put the namespaces of the IO module we are testing and some useful mock
2461 # classes in the __dict__ of each test.
2462 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2463 MockNonBlockWriterIO)
2464 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2465 c_io_ns = {name : getattr(io, name) for name in all_members}
2466 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2467 globs = globals()
2468 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2469 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2470 # Avoid turning open into a bound method.
2471 py_io_ns["open"] = pyio.OpenWrapper
2472 for test in tests:
2473 if test.__name__.startswith("C"):
2474 for name, obj in c_io_ns.items():
2475 setattr(test, name, obj)
2476 elif test.__name__.startswith("Py"):
2477 for name, obj in py_io_ns.items():
2478 setattr(test, name, obj)
2479
2480 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002481
2482if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002483 test_main()