blob: fe1a2b8a623b00b8ab232330d8df90679479a1a6 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000232 f.truncate(0)
233 self.assertEqual(f.tell(), 5)
234 f.seek(0)
235
236 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000237 self.assertEqual(f.seek(0), 0)
238 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000239 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000241 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000242 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000243 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000244 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000245 self.assertEqual(f.seek(-1, 2), 13)
246 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000247
Guido van Rossum87429772007-04-10 21:06:59 +0000248 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000249 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000250 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000251
Guido van Rossum9b76da62007-04-11 01:09:03 +0000252 def read_ops(self, f, buffered=False):
253 data = f.read(5)
254 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000255 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000256 self.assertEqual(f.readinto(data), 5)
257 self.assertEqual(data, b" worl")
258 self.assertEqual(f.readinto(data), 2)
259 self.assertEqual(len(data), 5)
260 self.assertEqual(data[:2], b"d\n")
261 self.assertEqual(f.seek(0), 0)
262 self.assertEqual(f.read(20), b"hello world\n")
263 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000264 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000265 self.assertEqual(f.seek(-6, 2), 6)
266 self.assertEqual(f.read(5), b"world")
267 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000268 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000269 self.assertEqual(f.seek(-6, 1), 5)
270 self.assertEqual(f.read(5), b" worl")
271 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000272 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000273 if buffered:
274 f.seek(0)
275 self.assertEqual(f.read(), b"hello world\n")
276 f.seek(6)
277 self.assertEqual(f.read(), b"world\n")
278 self.assertEqual(f.read(), b"")
279
Guido van Rossum34d69e52007-04-10 20:08:41 +0000280 LARGE = 2**31
281
Guido van Rossum53807da2007-04-10 19:01:47 +0000282 def large_file_ops(self, f):
283 assert f.readable()
284 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.seek(self.LARGE), self.LARGE)
286 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000287 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000288 self.assertEqual(f.tell(), self.LARGE + 3)
289 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000290 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000291 self.assertEqual(f.tell(), self.LARGE + 2)
292 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000293 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000294 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000295 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
296 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000297 self.assertEqual(f.read(2), b"x")
298
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000299 def test_invalid_operations(self):
300 # Try writing on a file opened in read mode and vice-versa.
301 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000302 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000303 self.assertRaises(IOError, fp.read)
304 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000305 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000306 self.assertRaises(IOError, fp.write, b"blah")
307 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000308 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000309 self.assertRaises(IOError, fp.write, "blah")
310 self.assertRaises(IOError, fp.writelines, ["blah\n"])
311
Guido van Rossum28524c72007-02-27 05:47:44 +0000312 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000313 with self.open(support.TESTFN, "wb", buffering=0) as f:
314 self.assertEqual(f.readable(), False)
315 self.assertEqual(f.writable(), True)
316 self.assertEqual(f.seekable(), True)
317 self.write_ops(f)
318 with self.open(support.TESTFN, "rb", buffering=0) as f:
319 self.assertEqual(f.readable(), True)
320 self.assertEqual(f.writable(), False)
321 self.assertEqual(f.seekable(), True)
322 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000323
Guido van Rossum87429772007-04-10 21:06:59 +0000324 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000325 with self.open(support.TESTFN, "wb") as f:
326 self.assertEqual(f.readable(), False)
327 self.assertEqual(f.writable(), True)
328 self.assertEqual(f.seekable(), True)
329 self.write_ops(f)
330 with self.open(support.TESTFN, "rb") as f:
331 self.assertEqual(f.readable(), True)
332 self.assertEqual(f.writable(), False)
333 self.assertEqual(f.seekable(), True)
334 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000335
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000336 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000337 with self.open(support.TESTFN, "wb") as f:
338 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
339 with self.open(support.TESTFN, "rb") as f:
340 self.assertEqual(f.readline(), b"abc\n")
341 self.assertEqual(f.readline(10), b"def\n")
342 self.assertEqual(f.readline(2), b"xy")
343 self.assertEqual(f.readline(4), b"zzy\n")
344 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000345 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000346 self.assertRaises(TypeError, f.readline, 5.3)
347 with self.open(support.TESTFN, "r") as f:
348 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000349
Guido van Rossum28524c72007-02-27 05:47:44 +0000350 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000351 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000352 self.write_ops(f)
353 data = f.getvalue()
354 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000355 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000357
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000359 # On Windows and Mac OSX this test comsumes large resources; It takes
360 # a long time to build the >2GB file and takes >2GB of disk space
361 # therefore the resource must be enabled to run this test.
362 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000363 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000364 print("\nTesting large file ops skipped on %s." % sys.platform,
365 file=sys.stderr)
366 print("It requires %d bytes and a long time." % self.LARGE,
367 file=sys.stderr)
368 print("Use 'regrtest.py -u largefile test_io' to run it.",
369 file=sys.stderr)
370 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000371 with self.open(support.TESTFN, "w+b", 0) as f:
372 self.large_file_ops(f)
373 with self.open(support.TESTFN, "w+b") as f:
374 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000375
376 def test_with_open(self):
377 for bufsize in (0, 1, 100):
378 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000379 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000380 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000381 self.assertEqual(f.closed, True)
382 f = None
383 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000384 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000385 1/0
386 except ZeroDivisionError:
387 self.assertEqual(f.closed, True)
388 else:
389 self.fail("1/0 didn't raise an exception")
390
Antoine Pitrou08838b62009-01-21 00:55:13 +0000391 # issue 5008
392 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000394 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000395 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000396 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000398 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000400 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000401
Guido van Rossum87429772007-04-10 21:06:59 +0000402 def test_destructor(self):
403 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000404 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000405 def __del__(self):
406 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000407 try:
408 f = super().__del__
409 except AttributeError:
410 pass
411 else:
412 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000413 def close(self):
414 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000415 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000416 def flush(self):
417 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000418 super().flush()
419 f = MyFileIO(support.TESTFN, "wb")
420 f.write(b"xxx")
421 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000422 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000423 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000424 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000425 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000426
427 def _check_base_destructor(self, base):
428 record = []
429 class MyIO(base):
430 def __init__(self):
431 # This exercises the availability of attributes on object
432 # destruction.
433 # (in the C version, close() is called by the tp_dealloc
434 # function, not by __del__)
435 self.on_del = 1
436 self.on_close = 2
437 self.on_flush = 3
438 def __del__(self):
439 record.append(self.on_del)
440 try:
441 f = super().__del__
442 except AttributeError:
443 pass
444 else:
445 f()
446 def close(self):
447 record.append(self.on_close)
448 super().close()
449 def flush(self):
450 record.append(self.on_flush)
451 super().flush()
452 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000453 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000454 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000455 self.assertEqual(record, [1, 2, 3])
456
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000457 def test_IOBase_destructor(self):
458 self._check_base_destructor(self.IOBase)
459
460 def test_RawIOBase_destructor(self):
461 self._check_base_destructor(self.RawIOBase)
462
463 def test_BufferedIOBase_destructor(self):
464 self._check_base_destructor(self.BufferedIOBase)
465
466 def test_TextIOBase_destructor(self):
467 self._check_base_destructor(self.TextIOBase)
468
Guido van Rossum87429772007-04-10 21:06:59 +0000469 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000470 with self.open(support.TESTFN, "wb") as f:
471 f.write(b"xxx")
472 with self.open(support.TESTFN, "rb") as f:
473 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000474
Guido van Rossumd4103952007-04-12 05:44:49 +0000475 def test_array_writes(self):
476 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000477 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000478 with self.open(support.TESTFN, "wb", 0) as f:
479 self.assertEqual(f.write(a), n)
480 with self.open(support.TESTFN, "wb") as f:
481 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000482
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000483 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000485 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000487 def test_read_closed(self):
488 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000489 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 with self.open(support.TESTFN, "r") as f:
491 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000492 self.assertEqual(file.read(), "egg\n")
493 file.seek(0)
494 file.close()
495 self.assertRaises(ValueError, file.read)
496
497 def test_no_closefd_with_filename(self):
498 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000499 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000500
501 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000502 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000503 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000506 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000507 self.assertEqual(file.buffer.raw.closefd, False)
508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000509 def test_garbage_collection(self):
510 # FileIO objects are collected, and collecting them flushes
511 # all data to disk.
512 f = self.FileIO(support.TESTFN, "wb")
513 f.write(b"abcxxx")
514 f.f = f
515 wr = weakref.ref(f)
516 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000517 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000518 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000519 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000520 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000521
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000522 def test_unbounded_file(self):
523 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
524 zero = "/dev/zero"
525 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000526 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000527 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000528 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000529 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000530 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000531 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000532 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000533 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000534 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000535 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000536 self.assertRaises(OverflowError, f.read)
537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000538class CIOTest(IOTest):
539 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000541class PyIOTest(IOTest):
542 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000543
Guido van Rossuma9e20242007-03-08 00:43:48 +0000544
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000545class CommonBufferedTests:
546 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
547
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000548 def test_detach(self):
549 raw = self.MockRawIO()
550 buf = self.tp(raw)
551 self.assertIs(buf.detach(), raw)
552 self.assertRaises(ValueError, buf.detach)
553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000554 def test_fileno(self):
555 rawio = self.MockRawIO()
556 bufio = self.tp(rawio)
557
558 self.assertEquals(42, bufio.fileno())
559
560 def test_no_fileno(self):
561 # XXX will we always have fileno() function? If so, kill
562 # this test. Else, write it.
563 pass
564
565 def test_invalid_args(self):
566 rawio = self.MockRawIO()
567 bufio = self.tp(rawio)
568 # Invalid whence
569 self.assertRaises(ValueError, bufio.seek, 0, -1)
570 self.assertRaises(ValueError, bufio.seek, 0, 3)
571
572 def test_override_destructor(self):
573 tp = self.tp
574 record = []
575 class MyBufferedIO(tp):
576 def __del__(self):
577 record.append(1)
578 try:
579 f = super().__del__
580 except AttributeError:
581 pass
582 else:
583 f()
584 def close(self):
585 record.append(2)
586 super().close()
587 def flush(self):
588 record.append(3)
589 super().flush()
590 rawio = self.MockRawIO()
591 bufio = MyBufferedIO(rawio)
592 writable = bufio.writable()
593 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000594 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 if writable:
596 self.assertEqual(record, [1, 2, 3])
597 else:
598 self.assertEqual(record, [1, 2])
599
600 def test_context_manager(self):
601 # Test usability as a context manager
602 rawio = self.MockRawIO()
603 bufio = self.tp(rawio)
604 def _with():
605 with bufio:
606 pass
607 _with()
608 # bufio should now be closed, and using it a second time should raise
609 # a ValueError.
610 self.assertRaises(ValueError, _with)
611
612 def test_error_through_destructor(self):
613 # Test that the exception state is not modified by a destructor,
614 # even if close() fails.
615 rawio = self.CloseFailureIO()
616 def f():
617 self.tp(rawio).xyzzy
618 with support.captured_output("stderr") as s:
619 self.assertRaises(AttributeError, f)
620 s = s.getvalue().strip()
621 if s:
622 # The destructor *may* have printed an unraisable error, check it
623 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000624 self.assertTrue(s.startswith("Exception IOError: "), s)
625 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000626
Antoine Pitrou716c4442009-05-23 19:04:03 +0000627 def test_repr(self):
628 raw = self.MockRawIO()
629 b = self.tp(raw)
630 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
631 self.assertEqual(repr(b), "<%s>" % clsname)
632 raw.name = "dummy"
633 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
634 raw.name = b"dummy"
635 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
636
Guido van Rossum78892e42007-04-06 17:31:18 +0000637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
639 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000641 def test_constructor(self):
642 rawio = self.MockRawIO([b"abc"])
643 bufio = self.tp(rawio)
644 bufio.__init__(rawio)
645 bufio.__init__(rawio, buffer_size=1024)
646 bufio.__init__(rawio, buffer_size=16)
647 self.assertEquals(b"abc", bufio.read())
648 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
649 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
650 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
651 rawio = self.MockRawIO([b"abc"])
652 bufio.__init__(rawio)
653 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000654
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000655 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000656 for arg in (None, 7):
657 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
658 bufio = self.tp(rawio)
659 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660 # Invalid args
661 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663 def test_read1(self):
664 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
665 bufio = self.tp(rawio)
666 self.assertEquals(b"a", bufio.read(1))
667 self.assertEquals(b"b", bufio.read1(1))
668 self.assertEquals(rawio._reads, 1)
669 self.assertEquals(b"c", bufio.read1(100))
670 self.assertEquals(rawio._reads, 1)
671 self.assertEquals(b"d", bufio.read1(100))
672 self.assertEquals(rawio._reads, 2)
673 self.assertEquals(b"efg", bufio.read1(100))
674 self.assertEquals(rawio._reads, 3)
675 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000676 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000677 # Invalid args
678 self.assertRaises(ValueError, bufio.read1, -1)
679
680 def test_readinto(self):
681 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
682 bufio = self.tp(rawio)
683 b = bytearray(2)
684 self.assertEquals(bufio.readinto(b), 2)
685 self.assertEquals(b, b"ab")
686 self.assertEquals(bufio.readinto(b), 2)
687 self.assertEquals(b, b"cd")
688 self.assertEquals(bufio.readinto(b), 2)
689 self.assertEquals(b, b"ef")
690 self.assertEquals(bufio.readinto(b), 1)
691 self.assertEquals(b, b"gf")
692 self.assertEquals(bufio.readinto(b), 0)
693 self.assertEquals(b, b"gf")
694
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000695 def test_readlines(self):
696 def bufio():
697 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
698 return self.tp(rawio)
699 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
700 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
701 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000703 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000704 data = b"abcdefghi"
705 dlen = len(data)
706
707 tests = [
708 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
709 [ 100, [ 3, 3, 3], [ dlen ] ],
710 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
711 ]
712
713 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000714 rawio = self.MockFileIO(data)
715 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000716 pos = 0
717 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000718 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000719 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000720 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000721 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000724 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
726 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000727
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000728 self.assertEquals(b"abcd", bufio.read(6))
729 self.assertEquals(b"e", bufio.read(1))
730 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000732 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000733 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 def test_read_past_eof(self):
736 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
737 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000738
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000739 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 def test_read_all(self):
742 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
743 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000744
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000745 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000747 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000748 try:
749 # Write out many bytes with exactly the same number of 0's,
750 # 1's... 255's. This will help us check that concurrent reading
751 # doesn't duplicate or forget contents.
752 N = 1000
753 l = list(range(256)) * N
754 random.shuffle(l)
755 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000756 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000757 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000758 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000760 errors = []
761 results = []
762 def f():
763 try:
764 # Intra-buffer read then buffer-flushing read
765 for n in cycle([1, 19]):
766 s = bufio.read(n)
767 if not s:
768 break
769 # list.append() is atomic
770 results.append(s)
771 except Exception as e:
772 errors.append(e)
773 raise
774 threads = [threading.Thread(target=f) for x in range(20)]
775 for t in threads:
776 t.start()
777 time.sleep(0.02) # yield
778 for t in threads:
779 t.join()
780 self.assertFalse(errors,
781 "the following exceptions were caught: %r" % errors)
782 s = b''.join(results)
783 for i in range(256):
784 c = bytes(bytearray([i]))
785 self.assertEqual(s.count(c), N)
786 finally:
787 support.unlink(support.TESTFN)
788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000789 def test_misbehaved_io(self):
790 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
791 bufio = self.tp(rawio)
792 self.assertRaises(IOError, bufio.seek, 0)
793 self.assertRaises(IOError, bufio.tell)
794
795class CBufferedReaderTest(BufferedReaderTest):
796 tp = io.BufferedReader
797
798 def test_constructor(self):
799 BufferedReaderTest.test_constructor(self)
800 # The allocation can succeed on 32-bit builds, e.g. with more
801 # than 2GB RAM and a 64-bit kernel.
802 if sys.maxsize > 0x7FFFFFFF:
803 rawio = self.MockRawIO()
804 bufio = self.tp(rawio)
805 self.assertRaises((OverflowError, MemoryError, ValueError),
806 bufio.__init__, rawio, sys.maxsize)
807
808 def test_initialization(self):
809 rawio = self.MockRawIO([b"abc"])
810 bufio = self.tp(rawio)
811 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
812 self.assertRaises(ValueError, bufio.read)
813 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
814 self.assertRaises(ValueError, bufio.read)
815 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
816 self.assertRaises(ValueError, bufio.read)
817
818 def test_misbehaved_io_read(self):
819 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
820 bufio = self.tp(rawio)
821 # _pyio.BufferedReader seems to implement reading different, so that
822 # checking this is not so easy.
823 self.assertRaises(IOError, bufio.read, 10)
824
825 def test_garbage_collection(self):
826 # C BufferedReader objects are collected.
827 # The Python version has __del__, so it ends into gc.garbage instead
828 rawio = self.FileIO(support.TESTFN, "w+b")
829 f = self.tp(rawio)
830 f.f = f
831 wr = weakref.ref(f)
832 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000833 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000834 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000835
836class PyBufferedReaderTest(BufferedReaderTest):
837 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000838
Guido van Rossuma9e20242007-03-08 00:43:48 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
841 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000842
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 def test_constructor(self):
844 rawio = self.MockRawIO()
845 bufio = self.tp(rawio)
846 bufio.__init__(rawio)
847 bufio.__init__(rawio, buffer_size=1024)
848 bufio.__init__(rawio, buffer_size=16)
849 self.assertEquals(3, bufio.write(b"abc"))
850 bufio.flush()
851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
852 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
853 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
854 bufio.__init__(rawio)
855 self.assertEquals(3, bufio.write(b"ghi"))
856 bufio.flush()
857 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
858
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000859 def test_detach_flush(self):
860 raw = self.MockRawIO()
861 buf = self.tp(raw)
862 buf.write(b"howdy!")
863 self.assertFalse(raw._write_stack)
864 buf.detach()
865 self.assertEqual(raw._write_stack, [b"howdy!"])
866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000868 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 writer = self.MockRawIO()
870 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000871 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000872 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000874 def test_write_overflow(self):
875 writer = self.MockRawIO()
876 bufio = self.tp(writer, 8)
877 contents = b"abcdefghijklmnop"
878 for n in range(0, len(contents), 3):
879 bufio.write(contents[n:n+3])
880 flushed = b"".join(writer._write_stack)
881 # At least (total - 8) bytes were implicitly flushed, perhaps more
882 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000883 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 def check_writes(self, intermediate_func):
886 # Lots of writes, test the flushed output is as expected.
887 contents = bytes(range(256)) * 1000
888 n = 0
889 writer = self.MockRawIO()
890 bufio = self.tp(writer, 13)
891 # Generator of write sizes: repeat each N 15 times then proceed to N+1
892 def gen_sizes():
893 for size in count(1):
894 for i in range(15):
895 yield size
896 sizes = gen_sizes()
897 while n < len(contents):
898 size = min(next(sizes), len(contents) - n)
899 self.assertEquals(bufio.write(contents[n:n+size]), size)
900 intermediate_func(bufio)
901 n += size
902 bufio.flush()
903 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000904
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000905 def test_writes(self):
906 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000908 def test_writes_and_flushes(self):
909 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000910
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000911 def test_writes_and_seeks(self):
912 def _seekabs(bufio):
913 pos = bufio.tell()
914 bufio.seek(pos + 1, 0)
915 bufio.seek(pos - 1, 0)
916 bufio.seek(pos, 0)
917 self.check_writes(_seekabs)
918 def _seekrel(bufio):
919 pos = bufio.seek(0, 1)
920 bufio.seek(+1, 1)
921 bufio.seek(-1, 1)
922 bufio.seek(pos, 0)
923 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000925 def test_writes_and_truncates(self):
926 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000928 def test_write_non_blocking(self):
929 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000930 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000932 self.assertEquals(bufio.write(b"abcd"), 4)
933 self.assertEquals(bufio.write(b"efghi"), 5)
934 # 1 byte will be written, the rest will be buffered
935 raw.block_on(b"k")
936 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000938 # 8 bytes will be written, 8 will be buffered and the rest will be lost
939 raw.block_on(b"0")
940 try:
941 bufio.write(b"opqrwxyz0123456789")
942 except self.BlockingIOError as e:
943 written = e.characters_written
944 else:
945 self.fail("BlockingIOError should have been raised")
946 self.assertEquals(written, 16)
947 self.assertEquals(raw.pop_written(),
948 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000950 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
951 s = raw.pop_written()
952 # Previously buffered bytes were flushed
953 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 def test_write_and_rewind(self):
956 raw = io.BytesIO()
957 bufio = self.tp(raw, 4)
958 self.assertEqual(bufio.write(b"abcdef"), 6)
959 self.assertEqual(bufio.tell(), 6)
960 bufio.seek(0, 0)
961 self.assertEqual(bufio.write(b"XY"), 2)
962 bufio.seek(6, 0)
963 self.assertEqual(raw.getvalue(), b"XYcdef")
964 self.assertEqual(bufio.write(b"123456"), 6)
965 bufio.flush()
966 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000967
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000968 def test_flush(self):
969 writer = self.MockRawIO()
970 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000971 bufio.write(b"abc")
972 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000973 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000974
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 def test_destructor(self):
976 writer = self.MockRawIO()
977 bufio = self.tp(writer, 8)
978 bufio.write(b"abc")
979 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000980 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000981 self.assertEquals(b"abc", writer._write_stack[0])
982
983 def test_truncate(self):
984 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000985 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 bufio = self.tp(raw, 8)
987 bufio.write(b"abcdef")
988 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000989 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000990 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 self.assertEqual(f.read(), b"abc")
992
993 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000994 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000995 # Write out many bytes from many threads and test they were
996 # all flushed.
997 N = 1000
998 contents = bytes(range(256)) * N
999 sizes = cycle([1, 19])
1000 n = 0
1001 queue = deque()
1002 while n < len(contents):
1003 size = next(sizes)
1004 queue.append(contents[n:n+size])
1005 n += size
1006 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001007 # We use a real file object because it allows us to
1008 # exercise situations where the GIL is released before
1009 # writing the buffer to the raw streams. This is in addition
1010 # to concurrency issues due to switching threads in the middle
1011 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001012 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001014 errors = []
1015 def f():
1016 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001017 while True:
1018 try:
1019 s = queue.popleft()
1020 except IndexError:
1021 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001022 bufio.write(s)
1023 except Exception as e:
1024 errors.append(e)
1025 raise
1026 threads = [threading.Thread(target=f) for x in range(20)]
1027 for t in threads:
1028 t.start()
1029 time.sleep(0.02) # yield
1030 for t in threads:
1031 t.join()
1032 self.assertFalse(errors,
1033 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001035 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036 s = f.read()
1037 for i in range(256):
1038 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001039 finally:
1040 support.unlink(support.TESTFN)
1041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001042 def test_misbehaved_io(self):
1043 rawio = self.MisbehavedRawIO()
1044 bufio = self.tp(rawio, 5)
1045 self.assertRaises(IOError, bufio.seek, 0)
1046 self.assertRaises(IOError, bufio.tell)
1047 self.assertRaises(IOError, bufio.write, b"abcdef")
1048
Benjamin Peterson59406a92009-03-26 17:10:29 +00001049 def test_max_buffer_size_deprecation(self):
1050 with support.check_warnings() as w:
1051 warnings.simplefilter("always", DeprecationWarning)
1052 self.tp(self.MockRawIO(), 8, 12)
1053 self.assertEqual(len(w.warnings), 1)
1054 warning = w.warnings[0]
1055 self.assertTrue(warning.category is DeprecationWarning)
1056 self.assertEqual(str(warning.message),
1057 "max_buffer_size is deprecated")
1058
1059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060class CBufferedWriterTest(BufferedWriterTest):
1061 tp = io.BufferedWriter
1062
1063 def test_constructor(self):
1064 BufferedWriterTest.test_constructor(self)
1065 # The allocation can succeed on 32-bit builds, e.g. with more
1066 # than 2GB RAM and a 64-bit kernel.
1067 if sys.maxsize > 0x7FFFFFFF:
1068 rawio = self.MockRawIO()
1069 bufio = self.tp(rawio)
1070 self.assertRaises((OverflowError, MemoryError, ValueError),
1071 bufio.__init__, rawio, sys.maxsize)
1072
1073 def test_initialization(self):
1074 rawio = self.MockRawIO()
1075 bufio = self.tp(rawio)
1076 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1077 self.assertRaises(ValueError, bufio.write, b"def")
1078 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1079 self.assertRaises(ValueError, bufio.write, b"def")
1080 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1081 self.assertRaises(ValueError, bufio.write, b"def")
1082
1083 def test_garbage_collection(self):
1084 # C BufferedWriter objects are collected, and collecting them flushes
1085 # all data to disk.
1086 # The Python version has __del__, so it ends into gc.garbage instead
1087 rawio = self.FileIO(support.TESTFN, "w+b")
1088 f = self.tp(rawio)
1089 f.write(b"123xxx")
1090 f.x = f
1091 wr = weakref.ref(f)
1092 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001093 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001094 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001095 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 self.assertEqual(f.read(), b"123xxx")
1097
1098
1099class PyBufferedWriterTest(BufferedWriterTest):
1100 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001101
Guido van Rossum01a27522007-03-07 01:00:12 +00001102class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001103
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001104 def test_constructor(self):
1105 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001106 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001107
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001108 def test_detach(self):
1109 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1110 self.assertRaises(self.UnsupportedOperation, pair.detach)
1111
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001112 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001113 with support.check_warnings() as w:
1114 warnings.simplefilter("always", DeprecationWarning)
1115 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1116 self.assertEqual(len(w.warnings), 1)
1117 warning = w.warnings[0]
1118 self.assertTrue(warning.category is DeprecationWarning)
1119 self.assertEqual(str(warning.message),
1120 "max_buffer_size is deprecated")
1121
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001122 def test_constructor_with_not_readable(self):
1123 class NotReadable(MockRawIO):
1124 def readable(self):
1125 return False
1126
1127 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1128
1129 def test_constructor_with_not_writeable(self):
1130 class NotWriteable(MockRawIO):
1131 def writable(self):
1132 return False
1133
1134 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1135
1136 def test_read(self):
1137 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1138
1139 self.assertEqual(pair.read(3), b"abc")
1140 self.assertEqual(pair.read(1), b"d")
1141 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001142 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1143 self.assertEqual(pair.read(None), b"abc")
1144
1145 def test_readlines(self):
1146 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1147 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1148 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1149 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001150
1151 def test_read1(self):
1152 # .read1() is delegated to the underlying reader object, so this test
1153 # can be shallow.
1154 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1155
1156 self.assertEqual(pair.read1(3), b"abc")
1157
1158 def test_readinto(self):
1159 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1160
1161 data = bytearray(5)
1162 self.assertEqual(pair.readinto(data), 5)
1163 self.assertEqual(data, b"abcde")
1164
1165 def test_write(self):
1166 w = self.MockRawIO()
1167 pair = self.tp(self.MockRawIO(), w)
1168
1169 pair.write(b"abc")
1170 pair.flush()
1171 pair.write(b"def")
1172 pair.flush()
1173 self.assertEqual(w._write_stack, [b"abc", b"def"])
1174
1175 def test_peek(self):
1176 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1177
1178 self.assertTrue(pair.peek(3).startswith(b"abc"))
1179 self.assertEqual(pair.read(3), b"abc")
1180
1181 def test_readable(self):
1182 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1183 self.assertTrue(pair.readable())
1184
1185 def test_writeable(self):
1186 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1187 self.assertTrue(pair.writable())
1188
1189 def test_seekable(self):
1190 # BufferedRWPairs are never seekable, even if their readers and writers
1191 # are.
1192 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1193 self.assertFalse(pair.seekable())
1194
1195 # .flush() is delegated to the underlying writer object and has been
1196 # tested in the test_write method.
1197
1198 def test_close_and_closed(self):
1199 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1200 self.assertFalse(pair.closed)
1201 pair.close()
1202 self.assertTrue(pair.closed)
1203
1204 def test_isatty(self):
1205 class SelectableIsAtty(MockRawIO):
1206 def __init__(self, isatty):
1207 MockRawIO.__init__(self)
1208 self._isatty = isatty
1209
1210 def isatty(self):
1211 return self._isatty
1212
1213 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1214 self.assertFalse(pair.isatty())
1215
1216 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1217 self.assertTrue(pair.isatty())
1218
1219 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1220 self.assertTrue(pair.isatty())
1221
1222 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1223 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001225class CBufferedRWPairTest(BufferedRWPairTest):
1226 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001227
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001228class PyBufferedRWPairTest(BufferedRWPairTest):
1229 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001230
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001231
1232class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1233 read_mode = "rb+"
1234 write_mode = "wb+"
1235
1236 def test_constructor(self):
1237 BufferedReaderTest.test_constructor(self)
1238 BufferedWriterTest.test_constructor(self)
1239
1240 def test_read_and_write(self):
1241 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001242 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001243
1244 self.assertEqual(b"as", rw.read(2))
1245 rw.write(b"ddd")
1246 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001247 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001249 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251 def test_seek_and_tell(self):
1252 raw = self.BytesIO(b"asdfghjkl")
1253 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001254
1255 self.assertEquals(b"as", rw.read(2))
1256 self.assertEquals(2, rw.tell())
1257 rw.seek(0, 0)
1258 self.assertEquals(b"asdf", rw.read(4))
1259
1260 rw.write(b"asdf")
1261 rw.seek(0, 0)
1262 self.assertEquals(b"asdfasdfl", rw.read())
1263 self.assertEquals(9, rw.tell())
1264 rw.seek(-4, 2)
1265 self.assertEquals(5, rw.tell())
1266 rw.seek(2, 1)
1267 self.assertEquals(7, rw.tell())
1268 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001269 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def check_flush_and_read(self, read_func):
1272 raw = self.BytesIO(b"abcdefghi")
1273 bufio = self.tp(raw)
1274
1275 self.assertEquals(b"ab", read_func(bufio, 2))
1276 bufio.write(b"12")
1277 self.assertEquals(b"ef", read_func(bufio, 2))
1278 self.assertEquals(6, bufio.tell())
1279 bufio.flush()
1280 self.assertEquals(6, bufio.tell())
1281 self.assertEquals(b"ghi", read_func(bufio))
1282 raw.seek(0, 0)
1283 raw.write(b"XYZ")
1284 # flush() resets the read buffer
1285 bufio.flush()
1286 bufio.seek(0, 0)
1287 self.assertEquals(b"XYZ", read_func(bufio, 3))
1288
1289 def test_flush_and_read(self):
1290 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1291
1292 def test_flush_and_readinto(self):
1293 def _readinto(bufio, n=-1):
1294 b = bytearray(n if n >= 0 else 9999)
1295 n = bufio.readinto(b)
1296 return bytes(b[:n])
1297 self.check_flush_and_read(_readinto)
1298
1299 def test_flush_and_peek(self):
1300 def _peek(bufio, n=-1):
1301 # This relies on the fact that the buffer can contain the whole
1302 # raw stream, otherwise peek() can return less.
1303 b = bufio.peek(n)
1304 if n != -1:
1305 b = b[:n]
1306 bufio.seek(len(b), 1)
1307 return b
1308 self.check_flush_and_read(_peek)
1309
1310 def test_flush_and_write(self):
1311 raw = self.BytesIO(b"abcdefghi")
1312 bufio = self.tp(raw)
1313
1314 bufio.write(b"123")
1315 bufio.flush()
1316 bufio.write(b"45")
1317 bufio.flush()
1318 bufio.seek(0, 0)
1319 self.assertEquals(b"12345fghi", raw.getvalue())
1320 self.assertEquals(b"12345fghi", bufio.read())
1321
1322 def test_threads(self):
1323 BufferedReaderTest.test_threads(self)
1324 BufferedWriterTest.test_threads(self)
1325
1326 def test_writes_and_peek(self):
1327 def _peek(bufio):
1328 bufio.peek(1)
1329 self.check_writes(_peek)
1330 def _peek(bufio):
1331 pos = bufio.tell()
1332 bufio.seek(-1, 1)
1333 bufio.peek(1)
1334 bufio.seek(pos, 0)
1335 self.check_writes(_peek)
1336
1337 def test_writes_and_reads(self):
1338 def _read(bufio):
1339 bufio.seek(-1, 1)
1340 bufio.read(1)
1341 self.check_writes(_read)
1342
1343 def test_writes_and_read1s(self):
1344 def _read1(bufio):
1345 bufio.seek(-1, 1)
1346 bufio.read1(1)
1347 self.check_writes(_read1)
1348
1349 def test_writes_and_readintos(self):
1350 def _read(bufio):
1351 bufio.seek(-1, 1)
1352 bufio.readinto(bytearray(1))
1353 self.check_writes(_read)
1354
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001355 def test_write_after_readahead(self):
1356 # Issue #6629: writing after the buffer was filled by readahead should
1357 # first rewind the raw stream.
1358 for overwrite_size in [1, 5]:
1359 raw = self.BytesIO(b"A" * 10)
1360 bufio = self.tp(raw, 4)
1361 # Trigger readahead
1362 self.assertEqual(bufio.read(1), b"A")
1363 self.assertEqual(bufio.tell(), 1)
1364 # Overwriting should rewind the raw stream if it needs so
1365 bufio.write(b"B" * overwrite_size)
1366 self.assertEqual(bufio.tell(), overwrite_size + 1)
1367 # If the write size was smaller than the buffer size, flush() and
1368 # check that rewind happens.
1369 bufio.flush()
1370 self.assertEqual(bufio.tell(), overwrite_size + 1)
1371 s = raw.getvalue()
1372 self.assertEqual(s,
1373 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1374
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001375 def test_truncate_after_read_or_write(self):
1376 raw = self.BytesIO(b"A" * 10)
1377 bufio = self.tp(raw, 100)
1378 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1379 self.assertEqual(bufio.truncate(), 2)
1380 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1381 self.assertEqual(bufio.truncate(), 4)
1382
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001383 def test_misbehaved_io(self):
1384 BufferedReaderTest.test_misbehaved_io(self)
1385 BufferedWriterTest.test_misbehaved_io(self)
1386
1387class CBufferedRandomTest(BufferedRandomTest):
1388 tp = io.BufferedRandom
1389
1390 def test_constructor(self):
1391 BufferedRandomTest.test_constructor(self)
1392 # The allocation can succeed on 32-bit builds, e.g. with more
1393 # than 2GB RAM and a 64-bit kernel.
1394 if sys.maxsize > 0x7FFFFFFF:
1395 rawio = self.MockRawIO()
1396 bufio = self.tp(rawio)
1397 self.assertRaises((OverflowError, MemoryError, ValueError),
1398 bufio.__init__, rawio, sys.maxsize)
1399
1400 def test_garbage_collection(self):
1401 CBufferedReaderTest.test_garbage_collection(self)
1402 CBufferedWriterTest.test_garbage_collection(self)
1403
1404class PyBufferedRandomTest(BufferedRandomTest):
1405 tp = pyio.BufferedRandom
1406
1407
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001408# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1409# properties:
1410# - A single output character can correspond to many bytes of input.
1411# - The number of input bytes to complete the character can be
1412# undetermined until the last input byte is received.
1413# - The number of input bytes can vary depending on previous input.
1414# - A single input byte can correspond to many characters of output.
1415# - The number of output characters can be undetermined until the
1416# last input byte is received.
1417# - The number of output characters can vary depending on previous input.
1418
1419class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1420 """
1421 For testing seek/tell behavior with a stateful, buffering decoder.
1422
1423 Input is a sequence of words. Words may be fixed-length (length set
1424 by input) or variable-length (period-terminated). In variable-length
1425 mode, extra periods are ignored. Possible words are:
1426 - 'i' followed by a number sets the input length, I (maximum 99).
1427 When I is set to 0, words are space-terminated.
1428 - 'o' followed by a number sets the output length, O (maximum 99).
1429 - Any other word is converted into a word followed by a period on
1430 the output. The output word consists of the input word truncated
1431 or padded out with hyphens to make its length equal to O. If O
1432 is 0, the word is output verbatim without truncating or padding.
1433 I and O are initially set to 1. When I changes, any buffered input is
1434 re-scanned according to the new I. EOF also terminates the last word.
1435 """
1436
1437 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001438 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001439 self.reset()
1440
1441 def __repr__(self):
1442 return '<SID %x>' % id(self)
1443
1444 def reset(self):
1445 self.i = 1
1446 self.o = 1
1447 self.buffer = bytearray()
1448
1449 def getstate(self):
1450 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1451 return bytes(self.buffer), i*100 + o
1452
1453 def setstate(self, state):
1454 buffer, io = state
1455 self.buffer = bytearray(buffer)
1456 i, o = divmod(io, 100)
1457 self.i, self.o = i ^ 1, o ^ 1
1458
1459 def decode(self, input, final=False):
1460 output = ''
1461 for b in input:
1462 if self.i == 0: # variable-length, terminated with period
1463 if b == ord('.'):
1464 if self.buffer:
1465 output += self.process_word()
1466 else:
1467 self.buffer.append(b)
1468 else: # fixed-length, terminate after self.i bytes
1469 self.buffer.append(b)
1470 if len(self.buffer) == self.i:
1471 output += self.process_word()
1472 if final and self.buffer: # EOF terminates the last word
1473 output += self.process_word()
1474 return output
1475
1476 def process_word(self):
1477 output = ''
1478 if self.buffer[0] == ord('i'):
1479 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1480 elif self.buffer[0] == ord('o'):
1481 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1482 else:
1483 output = self.buffer.decode('ascii')
1484 if len(output) < self.o:
1485 output += '-'*self.o # pad out with hyphens
1486 if self.o:
1487 output = output[:self.o] # truncate to output length
1488 output += '.'
1489 self.buffer = bytearray()
1490 return output
1491
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001492 codecEnabled = False
1493
1494 @classmethod
1495 def lookupTestDecoder(cls, name):
1496 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001497 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001498 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001499 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001500 incrementalencoder=None,
1501 streamreader=None, streamwriter=None,
1502 incrementaldecoder=cls)
1503
1504# Register the previous decoder for testing.
1505# Disabled by default, tests will enable it.
1506codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1507
1508
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001509class StatefulIncrementalDecoderTest(unittest.TestCase):
1510 """
1511 Make sure the StatefulIncrementalDecoder actually works.
1512 """
1513
1514 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001515 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001516 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001517 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001518 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001519 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001520 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001521 # I=0, O=6 (variable-length input, fixed-length output)
1522 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1523 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001524 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001525 # I=6, O=3 (fixed-length input > fixed-length output)
1526 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1527 # I=0, then 3; O=29, then 15 (with longer output)
1528 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1529 'a----------------------------.' +
1530 'b----------------------------.' +
1531 'cde--------------------------.' +
1532 'abcdefghijabcde.' +
1533 'a.b------------.' +
1534 '.c.------------.' +
1535 'd.e------------.' +
1536 'k--------------.' +
1537 'l--------------.' +
1538 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001539 ]
1540
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001542 # Try a few one-shot test cases.
1543 for input, eof, output in self.test_cases:
1544 d = StatefulIncrementalDecoder()
1545 self.assertEquals(d.decode(input, eof), output)
1546
1547 # Also test an unfinished decode, followed by forcing EOF.
1548 d = StatefulIncrementalDecoder()
1549 self.assertEquals(d.decode(b'oiabcd'), '')
1550 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001551
1552class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001553
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001554 def setUp(self):
1555 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1556 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001557 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001558
Guido van Rossumd0712812007-04-11 16:32:43 +00001559 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001560 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 def test_constructor(self):
1563 r = self.BytesIO(b"\xc3\xa9\n\n")
1564 b = self.BufferedReader(r, 1000)
1565 t = self.TextIOWrapper(b)
1566 t.__init__(b, encoding="latin1", newline="\r\n")
1567 self.assertEquals(t.encoding, "latin1")
1568 self.assertEquals(t.line_buffering, False)
1569 t.__init__(b, encoding="utf8", line_buffering=True)
1570 self.assertEquals(t.encoding, "utf8")
1571 self.assertEquals(t.line_buffering, True)
1572 self.assertEquals("\xe9\n", t.readline())
1573 self.assertRaises(TypeError, t.__init__, b, newline=42)
1574 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1575
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001576 def test_detach(self):
1577 r = self.BytesIO()
1578 b = self.BufferedWriter(r)
1579 t = self.TextIOWrapper(b)
1580 self.assertIs(t.detach(), b)
1581
1582 t = self.TextIOWrapper(b, encoding="ascii")
1583 t.write("howdy")
1584 self.assertFalse(r.getvalue())
1585 t.detach()
1586 self.assertEqual(r.getvalue(), b"howdy")
1587 self.assertRaises(ValueError, t.detach)
1588
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001589 def test_repr(self):
1590 raw = self.BytesIO("hello".encode("utf-8"))
1591 b = self.BufferedReader(raw)
1592 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001593 modname = self.TextIOWrapper.__module__
1594 self.assertEqual(repr(t),
1595 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1596 raw.name = "dummy"
1597 self.assertEqual(repr(t),
1598 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1599 raw.name = b"dummy"
1600 self.assertEqual(repr(t),
1601 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001602
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603 def test_line_buffering(self):
1604 r = self.BytesIO()
1605 b = self.BufferedWriter(r, 1000)
1606 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001607 t.write("X")
1608 self.assertEquals(r.getvalue(), b"") # No flush happened
1609 t.write("Y\nZ")
1610 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1611 t.write("A\rB")
1612 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614 def test_encoding(self):
1615 # Check the encoding attribute is always set, and valid
1616 b = self.BytesIO()
1617 t = self.TextIOWrapper(b, encoding="utf8")
1618 self.assertEqual(t.encoding, "utf8")
1619 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001620 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 codecs.lookup(t.encoding)
1622
1623 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001624 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001625 b = self.BytesIO(b"abc\n\xff\n")
1626 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001627 self.assertRaises(UnicodeError, t.read)
1628 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 b = self.BytesIO(b"abc\n\xff\n")
1630 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001631 self.assertRaises(UnicodeError, t.read)
1632 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 b = self.BytesIO(b"abc\n\xff\n")
1634 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001635 self.assertEquals(t.read(), "abc\n\n")
1636 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 b = self.BytesIO(b"abc\n\xff\n")
1638 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001639 self.assertEquals(t.read(), "abc\n\ufffd\n")
1640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001642 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 b = self.BytesIO()
1644 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001645 self.assertRaises(UnicodeError, t.write, "\xff")
1646 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001647 b = self.BytesIO()
1648 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001649 self.assertRaises(UnicodeError, t.write, "\xff")
1650 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 b = self.BytesIO()
1652 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001653 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001654 t.write("abc\xffdef\n")
1655 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001656 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001657 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 b = self.BytesIO()
1659 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001660 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001661 t.write("abc\xffdef\n")
1662 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001663 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001666 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1667
1668 tests = [
1669 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001670 [ '', input_lines ],
1671 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1672 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1673 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001674 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001675 encodings = (
1676 'utf-8', 'latin-1',
1677 'utf-16', 'utf-16-le', 'utf-16-be',
1678 'utf-32', 'utf-32-le', 'utf-32-be',
1679 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001680
Guido van Rossum8358db22007-08-18 21:39:55 +00001681 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001682 # character in TextIOWrapper._pending_line.
1683 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001684 # XXX: str.encode() should return bytes
1685 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001686 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001687 for bufsize in range(1, 10):
1688 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1690 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001691 encoding=encoding)
1692 if do_reads:
1693 got_lines = []
1694 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001695 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001696 if c2 == '':
1697 break
1698 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001699 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001700 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001701 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001702
1703 for got_line, exp_line in zip(got_lines, exp_lines):
1704 self.assertEquals(got_line, exp_line)
1705 self.assertEquals(len(got_lines), len(exp_lines))
1706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 def test_newlines_input(self):
1708 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001709 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1710 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001711 (None, normalized.decode("ascii").splitlines(True)),
1712 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001713 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1714 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1715 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001716 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 buf = self.BytesIO(testdata)
1718 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001719 self.assertEquals(txt.readlines(), expected)
1720 txt.seek(0)
1721 self.assertEquals(txt.read(), "".join(expected))
1722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 def test_newlines_output(self):
1724 testdict = {
1725 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1726 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1727 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1728 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1729 }
1730 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1731 for newline, expected in tests:
1732 buf = self.BytesIO()
1733 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1734 txt.write("AAA\nB")
1735 txt.write("BB\nCCC\n")
1736 txt.write("X\rY\r\nZ")
1737 txt.flush()
1738 self.assertEquals(buf.closed, False)
1739 self.assertEquals(buf.getvalue(), expected)
1740
1741 def test_destructor(self):
1742 l = []
1743 base = self.BytesIO
1744 class MyBytesIO(base):
1745 def close(self):
1746 l.append(self.getvalue())
1747 base.close(self)
1748 b = MyBytesIO()
1749 t = self.TextIOWrapper(b, encoding="ascii")
1750 t.write("abc")
1751 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001752 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 self.assertEquals([b"abc"], l)
1754
1755 def test_override_destructor(self):
1756 record = []
1757 class MyTextIO(self.TextIOWrapper):
1758 def __del__(self):
1759 record.append(1)
1760 try:
1761 f = super().__del__
1762 except AttributeError:
1763 pass
1764 else:
1765 f()
1766 def close(self):
1767 record.append(2)
1768 super().close()
1769 def flush(self):
1770 record.append(3)
1771 super().flush()
1772 b = self.BytesIO()
1773 t = MyTextIO(b, encoding="ascii")
1774 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001775 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001776 self.assertEqual(record, [1, 2, 3])
1777
1778 def test_error_through_destructor(self):
1779 # Test that the exception state is not modified by a destructor,
1780 # even if close() fails.
1781 rawio = self.CloseFailureIO()
1782 def f():
1783 self.TextIOWrapper(rawio).xyzzy
1784 with support.captured_output("stderr") as s:
1785 self.assertRaises(AttributeError, f)
1786 s = s.getvalue().strip()
1787 if s:
1788 # The destructor *may* have printed an unraisable error, check it
1789 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001790 self.assertTrue(s.startswith("Exception IOError: "), s)
1791 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001792
Guido van Rossum9b76da62007-04-11 01:09:03 +00001793 # Systematic tests of the text I/O API
1794
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001796 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1797 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001798 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001799 f._CHUNK_SIZE = chunksize
1800 self.assertEquals(f.write("abc"), 3)
1801 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001803 f._CHUNK_SIZE = chunksize
1804 self.assertEquals(f.tell(), 0)
1805 self.assertEquals(f.read(), "abc")
1806 cookie = f.tell()
1807 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001808 self.assertEquals(f.read(None), "abc")
1809 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001810 self.assertEquals(f.read(2), "ab")
1811 self.assertEquals(f.read(1), "c")
1812 self.assertEquals(f.read(1), "")
1813 self.assertEquals(f.read(), "")
1814 self.assertEquals(f.tell(), cookie)
1815 self.assertEquals(f.seek(0), 0)
1816 self.assertEquals(f.seek(0, 2), cookie)
1817 self.assertEquals(f.write("def"), 3)
1818 self.assertEquals(f.seek(cookie), cookie)
1819 self.assertEquals(f.read(), "def")
1820 if enc.startswith("utf"):
1821 self.multi_line_test(f, enc)
1822 f.close()
1823
1824 def multi_line_test(self, f, enc):
1825 f.seek(0)
1826 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001827 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001828 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001829 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001830 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001831 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001832 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001833 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001834 wlines.append((f.tell(), line))
1835 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001836 f.seek(0)
1837 rlines = []
1838 while True:
1839 pos = f.tell()
1840 line = f.readline()
1841 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001842 break
1843 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001844 self.assertEquals(rlines, wlines)
1845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001846 def test_telling(self):
1847 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001848 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001849 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001850 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001851 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001852 p2 = f.tell()
1853 f.seek(0)
1854 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001855 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001856 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001857 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001858 self.assertEquals(f.tell(), p2)
1859 f.seek(0)
1860 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001861 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001862 self.assertRaises(IOError, f.tell)
1863 self.assertEquals(f.tell(), p2)
1864 f.close()
1865
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 def test_seeking(self):
1867 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001868 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001869 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001870 prefix = bytes(u_prefix.encode("utf-8"))
1871 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001872 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001873 suffix = bytes(u_suffix.encode("utf-8"))
1874 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001876 f.write(line*2)
1877 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001879 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001880 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001881 self.assertEquals(f.tell(), prefix_size)
1882 self.assertEquals(f.readline(), u_suffix)
1883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001885 # Regression test for a specific bug
1886 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001887 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001888 f.write(data)
1889 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001891 f._CHUNK_SIZE # Just test that it exists
1892 f._CHUNK_SIZE = 2
1893 f.readline()
1894 f.tell()
1895
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001896 def test_seek_and_tell(self):
1897 #Test seek/tell using the StatefulIncrementalDecoder.
1898 # Make test faster by doing smaller seeks
1899 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001900
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001901 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001902 """Tell/seek to various points within a data stream and ensure
1903 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001904 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001905 f.write(data)
1906 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001907 f = self.open(support.TESTFN, encoding='test_decoder')
1908 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001909 decoded = f.read()
1910 f.close()
1911
Neal Norwitze2b07052008-03-18 19:52:05 +00001912 for i in range(min_pos, len(decoded) + 1): # seek positions
1913 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001915 self.assertEquals(f.read(i), decoded[:i])
1916 cookie = f.tell()
1917 self.assertEquals(f.read(j), decoded[i:i + j])
1918 f.seek(cookie)
1919 self.assertEquals(f.read(), decoded[i:])
1920 f.close()
1921
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001922 # Enable the test decoder.
1923 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001924
1925 # Run the tests.
1926 try:
1927 # Try each test case.
1928 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001929 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001930
1931 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001932 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1933 offset = CHUNK_SIZE - len(input)//2
1934 prefix = b'.'*offset
1935 # Don't bother seeking into the prefix (takes too long).
1936 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001937 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001938
1939 # Ensure our test decoder won't interfere with subsequent tests.
1940 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001941 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001942
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001944 data = "1234567890"
1945 tests = ("utf-16",
1946 "utf-16-le",
1947 "utf-16-be",
1948 "utf-32",
1949 "utf-32-le",
1950 "utf-32-be")
1951 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952 buf = self.BytesIO()
1953 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001954 # Check if the BOM is written only once (see issue1753).
1955 f.write(data)
1956 f.write(data)
1957 f.seek(0)
1958 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001959 f.seek(0)
1960 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001961 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1962
Benjamin Petersona1b49012009-03-31 23:11:32 +00001963 def test_unreadable(self):
1964 class UnReadable(self.BytesIO):
1965 def readable(self):
1966 return False
1967 txt = self.TextIOWrapper(UnReadable())
1968 self.assertRaises(IOError, txt.read)
1969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001970 def test_read_one_by_one(self):
1971 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001972 reads = ""
1973 while True:
1974 c = txt.read(1)
1975 if not c:
1976 break
1977 reads += c
1978 self.assertEquals(reads, "AA\nBB")
1979
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001980 def test_readlines(self):
1981 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1982 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1983 txt.seek(0)
1984 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1985 txt.seek(0)
1986 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1987
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001988 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001990 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001991 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001992 reads = ""
1993 while True:
1994 c = txt.read(128)
1995 if not c:
1996 break
1997 reads += c
1998 self.assertEquals(reads, "A"*127+"\nB")
1999
2000 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002002
2003 # read one char at a time
2004 reads = ""
2005 while True:
2006 c = txt.read(1)
2007 if not c:
2008 break
2009 reads += c
2010 self.assertEquals(reads, self.normalized)
2011
2012 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002014 txt._CHUNK_SIZE = 4
2015
2016 reads = ""
2017 while True:
2018 c = txt.read(4)
2019 if not c:
2020 break
2021 reads += c
2022 self.assertEquals(reads, self.normalized)
2023
2024 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002025 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002026 txt._CHUNK_SIZE = 4
2027
2028 reads = txt.read(4)
2029 reads += txt.read(4)
2030 reads += txt.readline()
2031 reads += txt.readline()
2032 reads += txt.readline()
2033 self.assertEquals(reads, self.normalized)
2034
2035 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002036 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002037 txt._CHUNK_SIZE = 4
2038
2039 reads = txt.read(4)
2040 reads += txt.read()
2041 self.assertEquals(reads, self.normalized)
2042
2043 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002044 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002045 txt._CHUNK_SIZE = 4
2046
2047 reads = txt.read(4)
2048 pos = txt.tell()
2049 txt.seek(0)
2050 txt.seek(pos)
2051 self.assertEquals(txt.read(4), "BBB\n")
2052
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002053 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054 buffer = self.BytesIO(self.testdata)
2055 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002056
2057 self.assertEqual(buffer.seekable(), txt.seekable())
2058
Antoine Pitroue4501852009-05-14 18:55:55 +00002059 def test_append_bom(self):
2060 # The BOM is not written again when appending to a non-empty file
2061 filename = support.TESTFN
2062 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2063 with self.open(filename, 'w', encoding=charset) as f:
2064 f.write('aaa')
2065 pos = f.tell()
2066 with self.open(filename, 'rb') as f:
2067 self.assertEquals(f.read(), 'aaa'.encode(charset))
2068
2069 with self.open(filename, 'a', encoding=charset) as f:
2070 f.write('xxx')
2071 with self.open(filename, 'rb') as f:
2072 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2073
2074 def test_seek_bom(self):
2075 # Same test, but when seeking manually
2076 filename = support.TESTFN
2077 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2078 with self.open(filename, 'w', encoding=charset) as f:
2079 f.write('aaa')
2080 pos = f.tell()
2081 with self.open(filename, 'r+', encoding=charset) as f:
2082 f.seek(pos)
2083 f.write('zzz')
2084 f.seek(0)
2085 f.write('bbb')
2086 with self.open(filename, 'rb') as f:
2087 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2088
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002089 def test_errors_property(self):
2090 with self.open(support.TESTFN, "w") as f:
2091 self.assertEqual(f.errors, "strict")
2092 with self.open(support.TESTFN, "w", errors="replace") as f:
2093 self.assertEqual(f.errors, "replace")
2094
Antoine Pitroue4501852009-05-14 18:55:55 +00002095
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002096 def test_threads_write(self):
2097 # Issue6750: concurrent writes could duplicate data
2098 event = threading.Event()
2099 with self.open(support.TESTFN, "w", buffering=1) as f:
2100 def run(n):
2101 text = "Thread%03d\n" % n
2102 event.wait()
2103 f.write(text)
2104 threads = [threading.Thread(target=lambda n=x: run(n))
2105 for x in range(20)]
2106 for t in threads:
2107 t.start()
2108 time.sleep(0.02)
2109 event.set()
2110 for t in threads:
2111 t.join()
2112 with self.open(support.TESTFN) as f:
2113 content = f.read()
2114 for n in range(20):
2115 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2116
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002117class CTextIOWrapperTest(TextIOWrapperTest):
2118
2119 def test_initialization(self):
2120 r = self.BytesIO(b"\xc3\xa9\n\n")
2121 b = self.BufferedReader(r, 1000)
2122 t = self.TextIOWrapper(b)
2123 self.assertRaises(TypeError, t.__init__, b, newline=42)
2124 self.assertRaises(ValueError, t.read)
2125 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2126 self.assertRaises(ValueError, t.read)
2127
2128 def test_garbage_collection(self):
2129 # C TextIOWrapper objects are collected, and collecting them flushes
2130 # all data to disk.
2131 # The Python version has __del__, so it ends in gc.garbage instead.
2132 rawio = io.FileIO(support.TESTFN, "wb")
2133 b = self.BufferedWriter(rawio)
2134 t = self.TextIOWrapper(b, encoding="ascii")
2135 t.write("456def")
2136 t.x = t
2137 wr = weakref.ref(t)
2138 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002139 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002140 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002141 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002142 self.assertEqual(f.read(), b"456def")
2143
2144class PyTextIOWrapperTest(TextIOWrapperTest):
2145 pass
2146
2147
2148class IncrementalNewlineDecoderTest(unittest.TestCase):
2149
2150 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002151 # UTF-8 specific tests for a newline decoder
2152 def _check_decode(b, s, **kwargs):
2153 # We exercise getstate() / setstate() as well as decode()
2154 state = decoder.getstate()
2155 self.assertEquals(decoder.decode(b, **kwargs), s)
2156 decoder.setstate(state)
2157 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002158
Antoine Pitrou180a3362008-12-14 16:36:46 +00002159 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002160
Antoine Pitrou180a3362008-12-14 16:36:46 +00002161 _check_decode(b'\xe8', "")
2162 _check_decode(b'\xa2', "")
2163 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002164
Antoine Pitrou180a3362008-12-14 16:36:46 +00002165 _check_decode(b'\xe8', "")
2166 _check_decode(b'\xa2', "")
2167 _check_decode(b'\x88', "\u8888")
2168
2169 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002170 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2171
Antoine Pitrou180a3362008-12-14 16:36:46 +00002172 decoder.reset()
2173 _check_decode(b'\n', "\n")
2174 _check_decode(b'\r', "")
2175 _check_decode(b'', "\n", final=True)
2176 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002177
Antoine Pitrou180a3362008-12-14 16:36:46 +00002178 _check_decode(b'\r', "")
2179 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002180
Antoine Pitrou180a3362008-12-14 16:36:46 +00002181 _check_decode(b'\r\r\n', "\n\n")
2182 _check_decode(b'\r', "")
2183 _check_decode(b'\r', "\n")
2184 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002185
Antoine Pitrou180a3362008-12-14 16:36:46 +00002186 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2187 _check_decode(b'\xe8\xa2\x88', "\u8888")
2188 _check_decode(b'\n', "\n")
2189 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2190 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002192 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002193 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 if encoding is not None:
2195 encoder = codecs.getincrementalencoder(encoding)()
2196 def _decode_bytewise(s):
2197 # Decode one byte at a time
2198 for b in encoder.encode(s):
2199 result.append(decoder.decode(bytes([b])))
2200 else:
2201 encoder = None
2202 def _decode_bytewise(s):
2203 # Decode one char at a time
2204 for c in s:
2205 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002206 self.assertEquals(decoder.newlines, None)
2207 _decode_bytewise("abc\n\r")
2208 self.assertEquals(decoder.newlines, '\n')
2209 _decode_bytewise("\nabc")
2210 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2211 _decode_bytewise("abc\r")
2212 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2213 _decode_bytewise("abc")
2214 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2215 _decode_bytewise("abc\r")
2216 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2217 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002218 input = "abc"
2219 if encoder is not None:
2220 encoder.reset()
2221 input = encoder.encode(input)
2222 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002223 self.assertEquals(decoder.newlines, None)
2224
2225 def test_newline_decoder(self):
2226 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002227 # None meaning the IncrementalNewlineDecoder takes unicode input
2228 # rather than bytes input
2229 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002230 'utf-16', 'utf-16-le', 'utf-16-be',
2231 'utf-32', 'utf-32-le', 'utf-32-be',
2232 )
2233 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 decoder = enc and codecs.getincrementaldecoder(enc)()
2235 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2236 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002237 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2239 self.check_newline_decoding_utf8(decoder)
2240
Antoine Pitrou66913e22009-03-06 23:40:56 +00002241 def test_newline_bytes(self):
2242 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2243 def _check(dec):
2244 self.assertEquals(dec.newlines, None)
2245 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2246 self.assertEquals(dec.newlines, None)
2247 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2248 self.assertEquals(dec.newlines, None)
2249 dec = self.IncrementalNewlineDecoder(None, translate=False)
2250 _check(dec)
2251 dec = self.IncrementalNewlineDecoder(None, translate=True)
2252 _check(dec)
2253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002254class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2255 pass
2256
2257class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2258 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002259
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002260
Guido van Rossum01a27522007-03-07 01:00:12 +00002261# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002262
Guido van Rossum5abbf752007-08-27 17:39:33 +00002263class MiscIOTest(unittest.TestCase):
2264
Barry Warsaw40e82462008-11-20 20:14:50 +00002265 def tearDown(self):
2266 support.unlink(support.TESTFN)
2267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 def test___all__(self):
2269 for name in self.io.__all__:
2270 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002271 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002272 if name == "open":
2273 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002274 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002275 self.assertTrue(issubclass(obj, Exception), name)
2276 elif not name.startswith("SEEK_"):
2277 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002278
Barry Warsaw40e82462008-11-20 20:14:50 +00002279 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002280 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002281 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002282 f.close()
2283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002285 self.assertEquals(f.name, support.TESTFN)
2286 self.assertEquals(f.buffer.name, support.TESTFN)
2287 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2288 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002289 self.assertEquals(f.buffer.mode, "rb")
2290 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002291 f.close()
2292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002293 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002294 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002295 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2296 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002297
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002298 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002299 self.assertEquals(g.mode, "wb")
2300 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002301 self.assertEquals(g.name, f.fileno())
2302 self.assertEquals(g.raw.name, f.fileno())
2303 f.close()
2304 g.close()
2305
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002306 def test_io_after_close(self):
2307 for kwargs in [
2308 {"mode": "w"},
2309 {"mode": "wb"},
2310 {"mode": "w", "buffering": 1},
2311 {"mode": "w", "buffering": 2},
2312 {"mode": "wb", "buffering": 0},
2313 {"mode": "r"},
2314 {"mode": "rb"},
2315 {"mode": "r", "buffering": 1},
2316 {"mode": "r", "buffering": 2},
2317 {"mode": "rb", "buffering": 0},
2318 {"mode": "w+"},
2319 {"mode": "w+b"},
2320 {"mode": "w+", "buffering": 1},
2321 {"mode": "w+", "buffering": 2},
2322 {"mode": "w+b", "buffering": 0},
2323 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002325 f.close()
2326 self.assertRaises(ValueError, f.flush)
2327 self.assertRaises(ValueError, f.fileno)
2328 self.assertRaises(ValueError, f.isatty)
2329 self.assertRaises(ValueError, f.__iter__)
2330 if hasattr(f, "peek"):
2331 self.assertRaises(ValueError, f.peek, 1)
2332 self.assertRaises(ValueError, f.read)
2333 if hasattr(f, "read1"):
2334 self.assertRaises(ValueError, f.read1, 1024)
2335 if hasattr(f, "readinto"):
2336 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2337 self.assertRaises(ValueError, f.readline)
2338 self.assertRaises(ValueError, f.readlines)
2339 self.assertRaises(ValueError, f.seek, 0)
2340 self.assertRaises(ValueError, f.tell)
2341 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002342 self.assertRaises(ValueError, f.write,
2343 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002344 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002345 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002346
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002347 def test_blockingioerror(self):
2348 # Various BlockingIOError issues
2349 self.assertRaises(TypeError, self.BlockingIOError)
2350 self.assertRaises(TypeError, self.BlockingIOError, 1)
2351 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2352 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2353 b = self.BlockingIOError(1, "")
2354 self.assertEqual(b.characters_written, 0)
2355 class C(str):
2356 pass
2357 c = C("")
2358 b = self.BlockingIOError(1, c)
2359 c.b = b
2360 b.c = c
2361 wr = weakref.ref(c)
2362 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002363 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002364 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365
2366 def test_abcs(self):
2367 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002368 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2369 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2370 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2371 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372
2373 def _check_abc_inheritance(self, abcmodule):
2374 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002375 self.assertIsInstance(f, abcmodule.IOBase)
2376 self.assertIsInstance(f, abcmodule.RawIOBase)
2377 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2378 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002379 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002380 self.assertIsInstance(f, abcmodule.IOBase)
2381 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2382 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2383 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002384 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002385 self.assertIsInstance(f, abcmodule.IOBase)
2386 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2387 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2388 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389
2390 def test_abc_inheritance(self):
2391 # Test implementations inherit from their respective ABCs
2392 self._check_abc_inheritance(self)
2393
2394 def test_abc_inheritance_official(self):
2395 # Test implementations inherit from the official ABCs of the
2396 # baseline "io" module.
2397 self._check_abc_inheritance(io)
2398
2399class CMiscIOTest(MiscIOTest):
2400 io = io
2401
2402class PyMiscIOTest(MiscIOTest):
2403 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002404
Guido van Rossum28524c72007-02-27 05:47:44 +00002405def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002406 tests = (CIOTest, PyIOTest,
2407 CBufferedReaderTest, PyBufferedReaderTest,
2408 CBufferedWriterTest, PyBufferedWriterTest,
2409 CBufferedRWPairTest, PyBufferedRWPairTest,
2410 CBufferedRandomTest, PyBufferedRandomTest,
2411 StatefulIncrementalDecoderTest,
2412 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2413 CTextIOWrapperTest, PyTextIOWrapperTest,
2414 CMiscIOTest, PyMiscIOTest,)
2415
2416 # Put the namespaces of the IO module we are testing and some useful mock
2417 # classes in the __dict__ of each test.
2418 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2419 MockNonBlockWriterIO)
2420 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2421 c_io_ns = {name : getattr(io, name) for name in all_members}
2422 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2423 globs = globals()
2424 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2425 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2426 # Avoid turning open into a bound method.
2427 py_io_ns["open"] = pyio.OpenWrapper
2428 for test in tests:
2429 if test.__name__.startswith("C"):
2430 for name, obj in c_io_ns.items():
2431 setattr(test, name, obj)
2432 elif test.__name__.startswith("Py"):
2433 for name, obj in py_io_ns.items():
2434 setattr(test, name, obj)
2435
2436 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002437
2438if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439 test_main()