blob: d5a6a4f74c332083b7e91067f112b5e08e052e5f [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000029import weakref
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import abc
Georg Brandl1b37e872010-03-14 10:45:50 +000031from itertools import cycle, count
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000032from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000033from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000034
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000035import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000036import io # C implementation of io
37import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000038
Guido van Rossuma9e20242007-03-08 00:43:48 +000039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000040def _default_chunk_size():
41 """Get the default TextIOWrapper chunk size"""
42 with open(__file__, "r", encoding="latin1") as f:
43 return f._CHUNK_SIZE
44
45
46class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048 def __init__(self, read_stack=()):
49 self._read_stack = list(read_stack)
50 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000051 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000052
53 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000054 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000055 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000056 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000058 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059
Guido van Rossum01a27522007-03-07 01:00:12 +000060 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000061 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000062 return len(b)
63
64 def writable(self):
65 return True
66
Guido van Rossum68bbcd22007-02-27 17:19:33 +000067 def fileno(self):
68 return 42
69
70 def readable(self):
71 return True
72
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074 return True
75
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000078
79 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return 0 # same comment as above
81
82 def readinto(self, buf):
83 self._reads += 1
84 max_len = len(buf)
85 try:
86 data = self._read_stack[0]
87 except IndexError:
88 return 0
89 if data is None:
90 del self._read_stack[0]
91 return None
92 n = len(data)
93 if len(data) <= max_len:
94 del self._read_stack[0]
95 buf[:n] = data
96 return n
97 else:
98 buf[:] = data[:max_len]
99 self._read_stack[0] = data[max_len:]
100 return max_len
101
102 def truncate(self, pos=None):
103 return pos
104
105class CMockRawIO(MockRawIO, io.RawIOBase):
106 pass
107
108class PyMockRawIO(MockRawIO, pyio.RawIOBase):
109 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000110
Guido van Rossuma9e20242007-03-08 00:43:48 +0000111
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000112class MisbehavedRawIO(MockRawIO):
113 def write(self, b):
114 return super().write(b) * 2
115
116 def read(self, n=None):
117 return super().read(n) * 2
118
119 def seek(self, pos, whence):
120 return -123
121
122 def tell(self):
123 return -456
124
125 def readinto(self, buf):
126 super().readinto(buf)
127 return len(buf) * 5
128
129class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
130 pass
131
132class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
133 pass
134
135
136class CloseFailureIO(MockRawIO):
137 closed = 0
138
139 def close(self):
140 if not self.closed:
141 self.closed = 1
142 raise IOError
143
144class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
145 pass
146
147class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
148 pass
149
150
151class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000152
153 def __init__(self, data):
154 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000155 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000156
157 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000158 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000159 self.read_history.append(None if res is None else len(res))
160 return res
161
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000162 def readinto(self, b):
163 res = super().readinto(b)
164 self.read_history.append(res)
165 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167class CMockFileIO(MockFileIO, io.BytesIO):
168 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000170class PyMockFileIO(MockFileIO, pyio.BytesIO):
171 pass
172
173
174class MockNonBlockWriterIO:
175
176 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000177 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000178 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 def pop_written(self):
181 s = b"".join(self._write_stack)
182 self._write_stack[:] = []
183 return s
184
185 def block_on(self, char):
186 """Block when a given char is encountered."""
187 self._blocker_char = char
188
189 def readable(self):
190 return True
191
192 def seekable(self):
193 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194
Guido van Rossum01a27522007-03-07 01:00:12 +0000195 def writable(self):
196 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000197
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000198 def write(self, b):
199 b = bytes(b)
200 n = -1
201 if self._blocker_char:
202 try:
203 n = b.index(self._blocker_char)
204 except ValueError:
205 pass
206 else:
207 self._blocker_char = None
208 self._write_stack.append(b[:n])
209 raise self.BlockingIOError(0, "test blocking", n)
210 self._write_stack.append(b)
211 return len(b)
212
213class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
214 BlockingIOError = io.BlockingIOError
215
216class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
217 BlockingIOError = pyio.BlockingIOError
218
Guido van Rossuma9e20242007-03-08 00:43:48 +0000219
Guido van Rossum28524c72007-02-27 05:47:44 +0000220class IOTest(unittest.TestCase):
221
Neal Norwitze7789b12008-03-24 06:18:09 +0000222 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000223 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000224
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000225 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000226 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227
Guido van Rossum28524c72007-02-27 05:47:44 +0000228 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000229 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000230 f.truncate(0)
231 self.assertEqual(f.tell(), 5)
232 f.seek(0)
233
234 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(0), 0)
236 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000237 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000239 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000240 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000241 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000242 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000243 self.assertEqual(f.seek(-1, 2), 13)
244 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000245
Guido van Rossum87429772007-04-10 21:06:59 +0000246 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000247 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000248 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000249
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 def read_ops(self, f, buffered=False):
251 data = f.read(5)
252 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000253 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000254 self.assertEqual(f.readinto(data), 5)
255 self.assertEqual(data, b" worl")
256 self.assertEqual(f.readinto(data), 2)
257 self.assertEqual(len(data), 5)
258 self.assertEqual(data[:2], b"d\n")
259 self.assertEqual(f.seek(0), 0)
260 self.assertEqual(f.read(20), b"hello world\n")
261 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 2), 6)
264 self.assertEqual(f.read(5), b"world")
265 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000266 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 self.assertEqual(f.seek(-6, 1), 5)
268 self.assertEqual(f.read(5), b" worl")
269 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000270 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000271 if buffered:
272 f.seek(0)
273 self.assertEqual(f.read(), b"hello world\n")
274 f.seek(6)
275 self.assertEqual(f.read(), b"world\n")
276 self.assertEqual(f.read(), b"")
277
Guido van Rossum34d69e52007-04-10 20:08:41 +0000278 LARGE = 2**31
279
Guido van Rossum53807da2007-04-10 19:01:47 +0000280 def large_file_ops(self, f):
281 assert f.readable()
282 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000283 self.assertEqual(f.seek(self.LARGE), self.LARGE)
284 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000285 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000286 self.assertEqual(f.tell(), self.LARGE + 3)
287 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000288 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.tell(), self.LARGE + 2)
290 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000291 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000292 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000293 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
294 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000295 self.assertEqual(f.read(2), b"x")
296
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000297 def test_invalid_operations(self):
298 # Try writing on a file opened in read mode and vice-versa.
299 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000300 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000301 self.assertRaises(IOError, fp.read)
302 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000303 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000304 self.assertRaises(IOError, fp.write, b"blah")
305 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000306 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000307 self.assertRaises(IOError, fp.write, "blah")
308 self.assertRaises(IOError, fp.writelines, ["blah\n"])
309
Guido van Rossum28524c72007-02-27 05:47:44 +0000310 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000311 with self.open(support.TESTFN, "wb", buffering=0) as f:
312 self.assertEqual(f.readable(), False)
313 self.assertEqual(f.writable(), True)
314 self.assertEqual(f.seekable(), True)
315 self.write_ops(f)
316 with self.open(support.TESTFN, "rb", buffering=0) as f:
317 self.assertEqual(f.readable(), True)
318 self.assertEqual(f.writable(), False)
319 self.assertEqual(f.seekable(), True)
320 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000321
Guido van Rossum87429772007-04-10 21:06:59 +0000322 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000323 with self.open(support.TESTFN, "wb") as f:
324 self.assertEqual(f.readable(), False)
325 self.assertEqual(f.writable(), True)
326 self.assertEqual(f.seekable(), True)
327 self.write_ops(f)
328 with self.open(support.TESTFN, "rb") as f:
329 self.assertEqual(f.readable(), True)
330 self.assertEqual(f.writable(), False)
331 self.assertEqual(f.seekable(), True)
332 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000333
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000334 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000335 with self.open(support.TESTFN, "wb") as f:
336 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
337 with self.open(support.TESTFN, "rb") as f:
338 self.assertEqual(f.readline(), b"abc\n")
339 self.assertEqual(f.readline(10), b"def\n")
340 self.assertEqual(f.readline(2), b"xy")
341 self.assertEqual(f.readline(4), b"zzy\n")
342 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000343 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000344 self.assertRaises(TypeError, f.readline, 5.3)
345 with self.open(support.TESTFN, "r") as f:
346 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000347
Guido van Rossum28524c72007-02-27 05:47:44 +0000348 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000349 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000350 self.write_ops(f)
351 data = f.getvalue()
352 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000353 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000354 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000355
Guido van Rossum53807da2007-04-10 19:01:47 +0000356 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000357 # On Windows and Mac OSX this test comsumes large resources; It takes
358 # a long time to build the >2GB file and takes >2GB of disk space
359 # therefore the resource must be enabled to run this test.
360 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000361 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000362 print("\nTesting large file ops skipped on %s." % sys.platform,
363 file=sys.stderr)
364 print("It requires %d bytes and a long time." % self.LARGE,
365 file=sys.stderr)
366 print("Use 'regrtest.py -u largefile test_io' to run it.",
367 file=sys.stderr)
368 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000369 with self.open(support.TESTFN, "w+b", 0) as f:
370 self.large_file_ops(f)
371 with self.open(support.TESTFN, "w+b") as f:
372 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000373
374 def test_with_open(self):
375 for bufsize in (0, 1, 100):
376 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000377 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000378 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000379 self.assertEqual(f.closed, True)
380 f = None
381 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000382 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000383 1/0
384 except ZeroDivisionError:
385 self.assertEqual(f.closed, True)
386 else:
387 self.fail("1/0 didn't raise an exception")
388
Antoine Pitrou08838b62009-01-21 00:55:13 +0000389 # issue 5008
390 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000394 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000395 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000396 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000397 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000398 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000399
Guido van Rossum87429772007-04-10 21:06:59 +0000400 def test_destructor(self):
401 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000403 def __del__(self):
404 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000405 try:
406 f = super().__del__
407 except AttributeError:
408 pass
409 else:
410 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000411 def close(self):
412 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000413 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000414 def flush(self):
415 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000416 super().flush()
417 f = MyFileIO(support.TESTFN, "wb")
418 f.write(b"xxx")
419 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000420 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000421 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000422 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000423 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000424
425 def _check_base_destructor(self, base):
426 record = []
427 class MyIO(base):
428 def __init__(self):
429 # This exercises the availability of attributes on object
430 # destruction.
431 # (in the C version, close() is called by the tp_dealloc
432 # function, not by __del__)
433 self.on_del = 1
434 self.on_close = 2
435 self.on_flush = 3
436 def __del__(self):
437 record.append(self.on_del)
438 try:
439 f = super().__del__
440 except AttributeError:
441 pass
442 else:
443 f()
444 def close(self):
445 record.append(self.on_close)
446 super().close()
447 def flush(self):
448 record.append(self.on_flush)
449 super().flush()
450 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000451 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000452 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000453 self.assertEqual(record, [1, 2, 3])
454
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 def test_IOBase_destructor(self):
456 self._check_base_destructor(self.IOBase)
457
458 def test_RawIOBase_destructor(self):
459 self._check_base_destructor(self.RawIOBase)
460
461 def test_BufferedIOBase_destructor(self):
462 self._check_base_destructor(self.BufferedIOBase)
463
464 def test_TextIOBase_destructor(self):
465 self._check_base_destructor(self.TextIOBase)
466
Guido van Rossum87429772007-04-10 21:06:59 +0000467 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000468 with self.open(support.TESTFN, "wb") as f:
469 f.write(b"xxx")
470 with self.open(support.TESTFN, "rb") as f:
471 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000472
Guido van Rossumd4103952007-04-12 05:44:49 +0000473 def test_array_writes(self):
474 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000475 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000476 with self.open(support.TESTFN, "wb", 0) as f:
477 self.assertEqual(f.write(a), n)
478 with self.open(support.TESTFN, "wb") as f:
479 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000480
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000481 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000482 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000483 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000484
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 def test_read_closed(self):
486 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000487 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 with self.open(support.TESTFN, "r") as f:
489 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000490 self.assertEqual(file.read(), "egg\n")
491 file.seek(0)
492 file.close()
493 self.assertRaises(ValueError, file.read)
494
495 def test_no_closefd_with_filename(self):
496 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000497 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000498
499 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000501 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000502 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000503 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000504 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000505 self.assertEqual(file.buffer.raw.closefd, False)
506
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000507 def test_garbage_collection(self):
508 # FileIO objects are collected, and collecting them flushes
509 # all data to disk.
510 f = self.FileIO(support.TESTFN, "wb")
511 f.write(b"abcxxx")
512 f.f = f
513 wr = weakref.ref(f)
514 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000515 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000516 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000517 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000518 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000519
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000520 def test_unbounded_file(self):
521 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
522 zero = "/dev/zero"
523 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000524 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000525 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000526 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000527 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000528 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000529 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000531 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000532 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000533 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000534 self.assertRaises(OverflowError, f.read)
535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000536class CIOTest(IOTest):
537 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539class PyIOTest(IOTest):
540 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000541
Guido van Rossuma9e20242007-03-08 00:43:48 +0000542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000543class CommonBufferedTests:
544 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
545
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000546 def test_detach(self):
547 raw = self.MockRawIO()
548 buf = self.tp(raw)
549 self.assertIs(buf.detach(), raw)
550 self.assertRaises(ValueError, buf.detach)
551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000552 def test_fileno(self):
553 rawio = self.MockRawIO()
554 bufio = self.tp(rawio)
555
556 self.assertEquals(42, bufio.fileno())
557
558 def test_no_fileno(self):
559 # XXX will we always have fileno() function? If so, kill
560 # this test. Else, write it.
561 pass
562
563 def test_invalid_args(self):
564 rawio = self.MockRawIO()
565 bufio = self.tp(rawio)
566 # Invalid whence
567 self.assertRaises(ValueError, bufio.seek, 0, -1)
568 self.assertRaises(ValueError, bufio.seek, 0, 3)
569
570 def test_override_destructor(self):
571 tp = self.tp
572 record = []
573 class MyBufferedIO(tp):
574 def __del__(self):
575 record.append(1)
576 try:
577 f = super().__del__
578 except AttributeError:
579 pass
580 else:
581 f()
582 def close(self):
583 record.append(2)
584 super().close()
585 def flush(self):
586 record.append(3)
587 super().flush()
588 rawio = self.MockRawIO()
589 bufio = MyBufferedIO(rawio)
590 writable = bufio.writable()
591 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000592 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000593 if writable:
594 self.assertEqual(record, [1, 2, 3])
595 else:
596 self.assertEqual(record, [1, 2])
597
598 def test_context_manager(self):
599 # Test usability as a context manager
600 rawio = self.MockRawIO()
601 bufio = self.tp(rawio)
602 def _with():
603 with bufio:
604 pass
605 _with()
606 # bufio should now be closed, and using it a second time should raise
607 # a ValueError.
608 self.assertRaises(ValueError, _with)
609
610 def test_error_through_destructor(self):
611 # Test that the exception state is not modified by a destructor,
612 # even if close() fails.
613 rawio = self.CloseFailureIO()
614 def f():
615 self.tp(rawio).xyzzy
616 with support.captured_output("stderr") as s:
617 self.assertRaises(AttributeError, f)
618 s = s.getvalue().strip()
619 if s:
620 # The destructor *may* have printed an unraisable error, check it
621 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000622 self.assertTrue(s.startswith("Exception IOError: "), s)
623 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000624
Antoine Pitrou716c4442009-05-23 19:04:03 +0000625 def test_repr(self):
626 raw = self.MockRawIO()
627 b = self.tp(raw)
628 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
629 self.assertEqual(repr(b), "<%s>" % clsname)
630 raw.name = "dummy"
631 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
632 raw.name = b"dummy"
633 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
634
Guido van Rossum78892e42007-04-06 17:31:18 +0000635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
637 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000639 def test_constructor(self):
640 rawio = self.MockRawIO([b"abc"])
641 bufio = self.tp(rawio)
642 bufio.__init__(rawio)
643 bufio.__init__(rawio, buffer_size=1024)
644 bufio.__init__(rawio, buffer_size=16)
645 self.assertEquals(b"abc", bufio.read())
646 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
647 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
648 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
649 rawio = self.MockRawIO([b"abc"])
650 bufio.__init__(rawio)
651 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000654 for arg in (None, 7):
655 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
656 bufio = self.tp(rawio)
657 self.assertEquals(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000658 # Invalid args
659 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000660
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000661 def test_read1(self):
662 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
663 bufio = self.tp(rawio)
664 self.assertEquals(b"a", bufio.read(1))
665 self.assertEquals(b"b", bufio.read1(1))
666 self.assertEquals(rawio._reads, 1)
667 self.assertEquals(b"c", bufio.read1(100))
668 self.assertEquals(rawio._reads, 1)
669 self.assertEquals(b"d", bufio.read1(100))
670 self.assertEquals(rawio._reads, 2)
671 self.assertEquals(b"efg", bufio.read1(100))
672 self.assertEquals(rawio._reads, 3)
673 self.assertEquals(b"", bufio.read1(100))
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000674 self.assertEquals(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675 # Invalid args
676 self.assertRaises(ValueError, bufio.read1, -1)
677
678 def test_readinto(self):
679 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
680 bufio = self.tp(rawio)
681 b = bytearray(2)
682 self.assertEquals(bufio.readinto(b), 2)
683 self.assertEquals(b, b"ab")
684 self.assertEquals(bufio.readinto(b), 2)
685 self.assertEquals(b, b"cd")
686 self.assertEquals(bufio.readinto(b), 2)
687 self.assertEquals(b, b"ef")
688 self.assertEquals(bufio.readinto(b), 1)
689 self.assertEquals(b, b"gf")
690 self.assertEquals(bufio.readinto(b), 0)
691 self.assertEquals(b, b"gf")
692
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000693 def test_readlines(self):
694 def bufio():
695 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
696 return self.tp(rawio)
697 self.assertEquals(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
698 self.assertEquals(bufio().readlines(5), [b"abc\n", b"d\n"])
699 self.assertEquals(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
700
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000701 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000702 data = b"abcdefghi"
703 dlen = len(data)
704
705 tests = [
706 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
707 [ 100, [ 3, 3, 3], [ dlen ] ],
708 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
709 ]
710
711 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000712 rawio = self.MockFileIO(data)
713 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000714 pos = 0
715 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000716 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000717 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000718 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000719 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000721 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000722 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000723 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
724 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000725
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000726 self.assertEquals(b"abcd", bufio.read(6))
727 self.assertEquals(b"e", bufio.read(1))
728 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000730 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000731 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000733 def test_read_past_eof(self):
734 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
735 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000736
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000737 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000739 def test_read_all(self):
740 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
741 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000742
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000743 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000745 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000746 try:
747 # Write out many bytes with exactly the same number of 0's,
748 # 1's... 255's. This will help us check that concurrent reading
749 # doesn't duplicate or forget contents.
750 N = 1000
751 l = list(range(256)) * N
752 random.shuffle(l)
753 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000754 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000755 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000756 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000757 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000758 errors = []
759 results = []
760 def f():
761 try:
762 # Intra-buffer read then buffer-flushing read
763 for n in cycle([1, 19]):
764 s = bufio.read(n)
765 if not s:
766 break
767 # list.append() is atomic
768 results.append(s)
769 except Exception as e:
770 errors.append(e)
771 raise
772 threads = [threading.Thread(target=f) for x in range(20)]
773 for t in threads:
774 t.start()
775 time.sleep(0.02) # yield
776 for t in threads:
777 t.join()
778 self.assertFalse(errors,
779 "the following exceptions were caught: %r" % errors)
780 s = b''.join(results)
781 for i in range(256):
782 c = bytes(bytearray([i]))
783 self.assertEqual(s.count(c), N)
784 finally:
785 support.unlink(support.TESTFN)
786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000787 def test_misbehaved_io(self):
788 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
789 bufio = self.tp(rawio)
790 self.assertRaises(IOError, bufio.seek, 0)
791 self.assertRaises(IOError, bufio.tell)
792
793class CBufferedReaderTest(BufferedReaderTest):
794 tp = io.BufferedReader
795
796 def test_constructor(self):
797 BufferedReaderTest.test_constructor(self)
798 # The allocation can succeed on 32-bit builds, e.g. with more
799 # than 2GB RAM and a 64-bit kernel.
800 if sys.maxsize > 0x7FFFFFFF:
801 rawio = self.MockRawIO()
802 bufio = self.tp(rawio)
803 self.assertRaises((OverflowError, MemoryError, ValueError),
804 bufio.__init__, rawio, sys.maxsize)
805
806 def test_initialization(self):
807 rawio = self.MockRawIO([b"abc"])
808 bufio = self.tp(rawio)
809 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
810 self.assertRaises(ValueError, bufio.read)
811 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
812 self.assertRaises(ValueError, bufio.read)
813 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
814 self.assertRaises(ValueError, bufio.read)
815
816 def test_misbehaved_io_read(self):
817 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
818 bufio = self.tp(rawio)
819 # _pyio.BufferedReader seems to implement reading different, so that
820 # checking this is not so easy.
821 self.assertRaises(IOError, bufio.read, 10)
822
823 def test_garbage_collection(self):
824 # C BufferedReader objects are collected.
825 # The Python version has __del__, so it ends into gc.garbage instead
826 rawio = self.FileIO(support.TESTFN, "w+b")
827 f = self.tp(rawio)
828 f.f = f
829 wr = weakref.ref(f)
830 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000831 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000832 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000833
834class PyBufferedReaderTest(BufferedReaderTest):
835 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000836
Guido van Rossuma9e20242007-03-08 00:43:48 +0000837
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000838class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
839 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841 def test_constructor(self):
842 rawio = self.MockRawIO()
843 bufio = self.tp(rawio)
844 bufio.__init__(rawio)
845 bufio.__init__(rawio, buffer_size=1024)
846 bufio.__init__(rawio, buffer_size=16)
847 self.assertEquals(3, bufio.write(b"abc"))
848 bufio.flush()
849 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
850 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
851 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
852 bufio.__init__(rawio)
853 self.assertEquals(3, bufio.write(b"ghi"))
854 bufio.flush()
855 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
856
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000857 def test_detach_flush(self):
858 raw = self.MockRawIO()
859 buf = self.tp(raw)
860 buf.write(b"howdy!")
861 self.assertFalse(raw._write_stack)
862 buf.detach()
863 self.assertEqual(raw._write_stack, [b"howdy!"])
864
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000865 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000866 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 writer = self.MockRawIO()
868 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000869 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000870 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000872 def test_write_overflow(self):
873 writer = self.MockRawIO()
874 bufio = self.tp(writer, 8)
875 contents = b"abcdefghijklmnop"
876 for n in range(0, len(contents), 3):
877 bufio.write(contents[n:n+3])
878 flushed = b"".join(writer._write_stack)
879 # At least (total - 8) bytes were implicitly flushed, perhaps more
880 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000881 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 def check_writes(self, intermediate_func):
884 # Lots of writes, test the flushed output is as expected.
885 contents = bytes(range(256)) * 1000
886 n = 0
887 writer = self.MockRawIO()
888 bufio = self.tp(writer, 13)
889 # Generator of write sizes: repeat each N 15 times then proceed to N+1
890 def gen_sizes():
891 for size in count(1):
892 for i in range(15):
893 yield size
894 sizes = gen_sizes()
895 while n < len(contents):
896 size = min(next(sizes), len(contents) - n)
897 self.assertEquals(bufio.write(contents[n:n+size]), size)
898 intermediate_func(bufio)
899 n += size
900 bufio.flush()
901 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000902
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903 def test_writes(self):
904 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 def test_writes_and_flushes(self):
907 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_writes_and_seeks(self):
910 def _seekabs(bufio):
911 pos = bufio.tell()
912 bufio.seek(pos + 1, 0)
913 bufio.seek(pos - 1, 0)
914 bufio.seek(pos, 0)
915 self.check_writes(_seekabs)
916 def _seekrel(bufio):
917 pos = bufio.seek(0, 1)
918 bufio.seek(+1, 1)
919 bufio.seek(-1, 1)
920 bufio.seek(pos, 0)
921 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000923 def test_writes_and_truncates(self):
924 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926 def test_write_non_blocking(self):
927 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000928 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000930 self.assertEquals(bufio.write(b"abcd"), 4)
931 self.assertEquals(bufio.write(b"efghi"), 5)
932 # 1 byte will be written, the rest will be buffered
933 raw.block_on(b"k")
934 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000936 # 8 bytes will be written, 8 will be buffered and the rest will be lost
937 raw.block_on(b"0")
938 try:
939 bufio.write(b"opqrwxyz0123456789")
940 except self.BlockingIOError as e:
941 written = e.characters_written
942 else:
943 self.fail("BlockingIOError should have been raised")
944 self.assertEquals(written, 16)
945 self.assertEquals(raw.pop_written(),
946 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000948 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
949 s = raw.pop_written()
950 # Previously buffered bytes were flushed
951 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000953 def test_write_and_rewind(self):
954 raw = io.BytesIO()
955 bufio = self.tp(raw, 4)
956 self.assertEqual(bufio.write(b"abcdef"), 6)
957 self.assertEqual(bufio.tell(), 6)
958 bufio.seek(0, 0)
959 self.assertEqual(bufio.write(b"XY"), 2)
960 bufio.seek(6, 0)
961 self.assertEqual(raw.getvalue(), b"XYcdef")
962 self.assertEqual(bufio.write(b"123456"), 6)
963 bufio.flush()
964 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000966 def test_flush(self):
967 writer = self.MockRawIO()
968 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000969 bufio.write(b"abc")
970 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000971 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000973 def test_destructor(self):
974 writer = self.MockRawIO()
975 bufio = self.tp(writer, 8)
976 bufio.write(b"abc")
977 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000978 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979 self.assertEquals(b"abc", writer._write_stack[0])
980
981 def test_truncate(self):
982 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000983 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 bufio = self.tp(raw, 8)
985 bufio.write(b"abcdef")
986 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000987 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000988 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000989 self.assertEqual(f.read(), b"abc")
990
991 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000992 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000993 # Write out many bytes from many threads and test they were
994 # all flushed.
995 N = 1000
996 contents = bytes(range(256)) * N
997 sizes = cycle([1, 19])
998 n = 0
999 queue = deque()
1000 while n < len(contents):
1001 size = next(sizes)
1002 queue.append(contents[n:n+size])
1003 n += size
1004 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001005 # We use a real file object because it allows us to
1006 # exercise situations where the GIL is released before
1007 # writing the buffer to the raw streams. This is in addition
1008 # to concurrency issues due to switching threads in the middle
1009 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001010 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001011 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001012 errors = []
1013 def f():
1014 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001015 while True:
1016 try:
1017 s = queue.popleft()
1018 except IndexError:
1019 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001020 bufio.write(s)
1021 except Exception as e:
1022 errors.append(e)
1023 raise
1024 threads = [threading.Thread(target=f) for x in range(20)]
1025 for t in threads:
1026 t.start()
1027 time.sleep(0.02) # yield
1028 for t in threads:
1029 t.join()
1030 self.assertFalse(errors,
1031 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001032 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001033 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034 s = f.read()
1035 for i in range(256):
1036 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001037 finally:
1038 support.unlink(support.TESTFN)
1039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040 def test_misbehaved_io(self):
1041 rawio = self.MisbehavedRawIO()
1042 bufio = self.tp(rawio, 5)
1043 self.assertRaises(IOError, bufio.seek, 0)
1044 self.assertRaises(IOError, bufio.tell)
1045 self.assertRaises(IOError, bufio.write, b"abcdef")
1046
Benjamin Peterson59406a92009-03-26 17:10:29 +00001047 def test_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001048 with support.check_warnings(("max_buffer_size is deprecated",
1049 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001050 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001051
1052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001053class CBufferedWriterTest(BufferedWriterTest):
1054 tp = io.BufferedWriter
1055
1056 def test_constructor(self):
1057 BufferedWriterTest.test_constructor(self)
1058 # The allocation can succeed on 32-bit builds, e.g. with more
1059 # than 2GB RAM and a 64-bit kernel.
1060 if sys.maxsize > 0x7FFFFFFF:
1061 rawio = self.MockRawIO()
1062 bufio = self.tp(rawio)
1063 self.assertRaises((OverflowError, MemoryError, ValueError),
1064 bufio.__init__, rawio, sys.maxsize)
1065
1066 def test_initialization(self):
1067 rawio = self.MockRawIO()
1068 bufio = self.tp(rawio)
1069 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1070 self.assertRaises(ValueError, bufio.write, b"def")
1071 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1072 self.assertRaises(ValueError, bufio.write, b"def")
1073 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1074 self.assertRaises(ValueError, bufio.write, b"def")
1075
1076 def test_garbage_collection(self):
1077 # C BufferedWriter objects are collected, and collecting them flushes
1078 # all data to disk.
1079 # The Python version has __del__, so it ends into gc.garbage instead
1080 rawio = self.FileIO(support.TESTFN, "w+b")
1081 f = self.tp(rawio)
1082 f.write(b"123xxx")
1083 f.x = f
1084 wr = weakref.ref(f)
1085 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001086 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001087 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001088 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001089 self.assertEqual(f.read(), b"123xxx")
1090
1091
1092class PyBufferedWriterTest(BufferedWriterTest):
1093 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001094
Guido van Rossum01a27522007-03-07 01:00:12 +00001095class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001096
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001097 def test_constructor(self):
1098 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001099 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001100
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001101 def test_detach(self):
1102 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1103 self.assertRaises(self.UnsupportedOperation, pair.detach)
1104
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001105 def test_constructor_max_buffer_size_deprecation(self):
Florent Xicluna8fbddf12010-03-17 20:29:51 +00001106 with support.check_warnings(("max_buffer_size is deprecated",
1107 DeprecationWarning)):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001108 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001109
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001110 def test_constructor_with_not_readable(self):
1111 class NotReadable(MockRawIO):
1112 def readable(self):
1113 return False
1114
1115 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1116
1117 def test_constructor_with_not_writeable(self):
1118 class NotWriteable(MockRawIO):
1119 def writable(self):
1120 return False
1121
1122 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1123
1124 def test_read(self):
1125 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1126
1127 self.assertEqual(pair.read(3), b"abc")
1128 self.assertEqual(pair.read(1), b"d")
1129 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001130 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1131 self.assertEqual(pair.read(None), b"abc")
1132
1133 def test_readlines(self):
1134 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1135 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1136 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1137 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001138
1139 def test_read1(self):
1140 # .read1() is delegated to the underlying reader object, so this test
1141 # can be shallow.
1142 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1143
1144 self.assertEqual(pair.read1(3), b"abc")
1145
1146 def test_readinto(self):
1147 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1148
1149 data = bytearray(5)
1150 self.assertEqual(pair.readinto(data), 5)
1151 self.assertEqual(data, b"abcde")
1152
1153 def test_write(self):
1154 w = self.MockRawIO()
1155 pair = self.tp(self.MockRawIO(), w)
1156
1157 pair.write(b"abc")
1158 pair.flush()
1159 pair.write(b"def")
1160 pair.flush()
1161 self.assertEqual(w._write_stack, [b"abc", b"def"])
1162
1163 def test_peek(self):
1164 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1165
1166 self.assertTrue(pair.peek(3).startswith(b"abc"))
1167 self.assertEqual(pair.read(3), b"abc")
1168
1169 def test_readable(self):
1170 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1171 self.assertTrue(pair.readable())
1172
1173 def test_writeable(self):
1174 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1175 self.assertTrue(pair.writable())
1176
1177 def test_seekable(self):
1178 # BufferedRWPairs are never seekable, even if their readers and writers
1179 # are.
1180 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1181 self.assertFalse(pair.seekable())
1182
1183 # .flush() is delegated to the underlying writer object and has been
1184 # tested in the test_write method.
1185
1186 def test_close_and_closed(self):
1187 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1188 self.assertFalse(pair.closed)
1189 pair.close()
1190 self.assertTrue(pair.closed)
1191
1192 def test_isatty(self):
1193 class SelectableIsAtty(MockRawIO):
1194 def __init__(self, isatty):
1195 MockRawIO.__init__(self)
1196 self._isatty = isatty
1197
1198 def isatty(self):
1199 return self._isatty
1200
1201 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1202 self.assertFalse(pair.isatty())
1203
1204 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1205 self.assertTrue(pair.isatty())
1206
1207 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1208 self.assertTrue(pair.isatty())
1209
1210 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1211 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001212
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001213class CBufferedRWPairTest(BufferedRWPairTest):
1214 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001215
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001216class PyBufferedRWPairTest(BufferedRWPairTest):
1217 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001218
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001219
1220class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1221 read_mode = "rb+"
1222 write_mode = "wb+"
1223
1224 def test_constructor(self):
1225 BufferedReaderTest.test_constructor(self)
1226 BufferedWriterTest.test_constructor(self)
1227
1228 def test_read_and_write(self):
1229 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001230 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001231
1232 self.assertEqual(b"as", rw.read(2))
1233 rw.write(b"ddd")
1234 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001235 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001237 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_seek_and_tell(self):
1240 raw = self.BytesIO(b"asdfghjkl")
1241 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001242
1243 self.assertEquals(b"as", rw.read(2))
1244 self.assertEquals(2, rw.tell())
1245 rw.seek(0, 0)
1246 self.assertEquals(b"asdf", rw.read(4))
1247
1248 rw.write(b"asdf")
1249 rw.seek(0, 0)
1250 self.assertEquals(b"asdfasdfl", rw.read())
1251 self.assertEquals(9, rw.tell())
1252 rw.seek(-4, 2)
1253 self.assertEquals(5, rw.tell())
1254 rw.seek(2, 1)
1255 self.assertEquals(7, rw.tell())
1256 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001257 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001259 def check_flush_and_read(self, read_func):
1260 raw = self.BytesIO(b"abcdefghi")
1261 bufio = self.tp(raw)
1262
1263 self.assertEquals(b"ab", read_func(bufio, 2))
1264 bufio.write(b"12")
1265 self.assertEquals(b"ef", read_func(bufio, 2))
1266 self.assertEquals(6, bufio.tell())
1267 bufio.flush()
1268 self.assertEquals(6, bufio.tell())
1269 self.assertEquals(b"ghi", read_func(bufio))
1270 raw.seek(0, 0)
1271 raw.write(b"XYZ")
1272 # flush() resets the read buffer
1273 bufio.flush()
1274 bufio.seek(0, 0)
1275 self.assertEquals(b"XYZ", read_func(bufio, 3))
1276
1277 def test_flush_and_read(self):
1278 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1279
1280 def test_flush_and_readinto(self):
1281 def _readinto(bufio, n=-1):
1282 b = bytearray(n if n >= 0 else 9999)
1283 n = bufio.readinto(b)
1284 return bytes(b[:n])
1285 self.check_flush_and_read(_readinto)
1286
1287 def test_flush_and_peek(self):
1288 def _peek(bufio, n=-1):
1289 # This relies on the fact that the buffer can contain the whole
1290 # raw stream, otherwise peek() can return less.
1291 b = bufio.peek(n)
1292 if n != -1:
1293 b = b[:n]
1294 bufio.seek(len(b), 1)
1295 return b
1296 self.check_flush_and_read(_peek)
1297
1298 def test_flush_and_write(self):
1299 raw = self.BytesIO(b"abcdefghi")
1300 bufio = self.tp(raw)
1301
1302 bufio.write(b"123")
1303 bufio.flush()
1304 bufio.write(b"45")
1305 bufio.flush()
1306 bufio.seek(0, 0)
1307 self.assertEquals(b"12345fghi", raw.getvalue())
1308 self.assertEquals(b"12345fghi", bufio.read())
1309
1310 def test_threads(self):
1311 BufferedReaderTest.test_threads(self)
1312 BufferedWriterTest.test_threads(self)
1313
1314 def test_writes_and_peek(self):
1315 def _peek(bufio):
1316 bufio.peek(1)
1317 self.check_writes(_peek)
1318 def _peek(bufio):
1319 pos = bufio.tell()
1320 bufio.seek(-1, 1)
1321 bufio.peek(1)
1322 bufio.seek(pos, 0)
1323 self.check_writes(_peek)
1324
1325 def test_writes_and_reads(self):
1326 def _read(bufio):
1327 bufio.seek(-1, 1)
1328 bufio.read(1)
1329 self.check_writes(_read)
1330
1331 def test_writes_and_read1s(self):
1332 def _read1(bufio):
1333 bufio.seek(-1, 1)
1334 bufio.read1(1)
1335 self.check_writes(_read1)
1336
1337 def test_writes_and_readintos(self):
1338 def _read(bufio):
1339 bufio.seek(-1, 1)
1340 bufio.readinto(bytearray(1))
1341 self.check_writes(_read)
1342
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001343 def test_write_after_readahead(self):
1344 # Issue #6629: writing after the buffer was filled by readahead should
1345 # first rewind the raw stream.
1346 for overwrite_size in [1, 5]:
1347 raw = self.BytesIO(b"A" * 10)
1348 bufio = self.tp(raw, 4)
1349 # Trigger readahead
1350 self.assertEqual(bufio.read(1), b"A")
1351 self.assertEqual(bufio.tell(), 1)
1352 # Overwriting should rewind the raw stream if it needs so
1353 bufio.write(b"B" * overwrite_size)
1354 self.assertEqual(bufio.tell(), overwrite_size + 1)
1355 # If the write size was smaller than the buffer size, flush() and
1356 # check that rewind happens.
1357 bufio.flush()
1358 self.assertEqual(bufio.tell(), overwrite_size + 1)
1359 s = raw.getvalue()
1360 self.assertEqual(s,
1361 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1362
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001363 def test_truncate_after_read_or_write(self):
1364 raw = self.BytesIO(b"A" * 10)
1365 bufio = self.tp(raw, 100)
1366 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1367 self.assertEqual(bufio.truncate(), 2)
1368 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1369 self.assertEqual(bufio.truncate(), 4)
1370
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001371 def test_misbehaved_io(self):
1372 BufferedReaderTest.test_misbehaved_io(self)
1373 BufferedWriterTest.test_misbehaved_io(self)
1374
1375class CBufferedRandomTest(BufferedRandomTest):
1376 tp = io.BufferedRandom
1377
1378 def test_constructor(self):
1379 BufferedRandomTest.test_constructor(self)
1380 # The allocation can succeed on 32-bit builds, e.g. with more
1381 # than 2GB RAM and a 64-bit kernel.
1382 if sys.maxsize > 0x7FFFFFFF:
1383 rawio = self.MockRawIO()
1384 bufio = self.tp(rawio)
1385 self.assertRaises((OverflowError, MemoryError, ValueError),
1386 bufio.__init__, rawio, sys.maxsize)
1387
1388 def test_garbage_collection(self):
1389 CBufferedReaderTest.test_garbage_collection(self)
1390 CBufferedWriterTest.test_garbage_collection(self)
1391
1392class PyBufferedRandomTest(BufferedRandomTest):
1393 tp = pyio.BufferedRandom
1394
1395
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001396# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1397# properties:
1398# - A single output character can correspond to many bytes of input.
1399# - The number of input bytes to complete the character can be
1400# undetermined until the last input byte is received.
1401# - The number of input bytes can vary depending on previous input.
1402# - A single input byte can correspond to many characters of output.
1403# - The number of output characters can be undetermined until the
1404# last input byte is received.
1405# - The number of output characters can vary depending on previous input.
1406
1407class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1408 """
1409 For testing seek/tell behavior with a stateful, buffering decoder.
1410
1411 Input is a sequence of words. Words may be fixed-length (length set
1412 by input) or variable-length (period-terminated). In variable-length
1413 mode, extra periods are ignored. Possible words are:
1414 - 'i' followed by a number sets the input length, I (maximum 99).
1415 When I is set to 0, words are space-terminated.
1416 - 'o' followed by a number sets the output length, O (maximum 99).
1417 - Any other word is converted into a word followed by a period on
1418 the output. The output word consists of the input word truncated
1419 or padded out with hyphens to make its length equal to O. If O
1420 is 0, the word is output verbatim without truncating or padding.
1421 I and O are initially set to 1. When I changes, any buffered input is
1422 re-scanned according to the new I. EOF also terminates the last word.
1423 """
1424
1425 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001426 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001427 self.reset()
1428
1429 def __repr__(self):
1430 return '<SID %x>' % id(self)
1431
1432 def reset(self):
1433 self.i = 1
1434 self.o = 1
1435 self.buffer = bytearray()
1436
1437 def getstate(self):
1438 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1439 return bytes(self.buffer), i*100 + o
1440
1441 def setstate(self, state):
1442 buffer, io = state
1443 self.buffer = bytearray(buffer)
1444 i, o = divmod(io, 100)
1445 self.i, self.o = i ^ 1, o ^ 1
1446
1447 def decode(self, input, final=False):
1448 output = ''
1449 for b in input:
1450 if self.i == 0: # variable-length, terminated with period
1451 if b == ord('.'):
1452 if self.buffer:
1453 output += self.process_word()
1454 else:
1455 self.buffer.append(b)
1456 else: # fixed-length, terminate after self.i bytes
1457 self.buffer.append(b)
1458 if len(self.buffer) == self.i:
1459 output += self.process_word()
1460 if final and self.buffer: # EOF terminates the last word
1461 output += self.process_word()
1462 return output
1463
1464 def process_word(self):
1465 output = ''
1466 if self.buffer[0] == ord('i'):
1467 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1468 elif self.buffer[0] == ord('o'):
1469 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1470 else:
1471 output = self.buffer.decode('ascii')
1472 if len(output) < self.o:
1473 output += '-'*self.o # pad out with hyphens
1474 if self.o:
1475 output = output[:self.o] # truncate to output length
1476 output += '.'
1477 self.buffer = bytearray()
1478 return output
1479
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001480 codecEnabled = False
1481
1482 @classmethod
1483 def lookupTestDecoder(cls, name):
1484 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001485 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001486 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001487 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001488 incrementalencoder=None,
1489 streamreader=None, streamwriter=None,
1490 incrementaldecoder=cls)
1491
1492# Register the previous decoder for testing.
1493# Disabled by default, tests will enable it.
1494codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1495
1496
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001497class StatefulIncrementalDecoderTest(unittest.TestCase):
1498 """
1499 Make sure the StatefulIncrementalDecoder actually works.
1500 """
1501
1502 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001503 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001504 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001505 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001506 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001507 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001508 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001509 # I=0, O=6 (variable-length input, fixed-length output)
1510 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1511 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001512 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001513 # I=6, O=3 (fixed-length input > fixed-length output)
1514 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1515 # I=0, then 3; O=29, then 15 (with longer output)
1516 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1517 'a----------------------------.' +
1518 'b----------------------------.' +
1519 'cde--------------------------.' +
1520 'abcdefghijabcde.' +
1521 'a.b------------.' +
1522 '.c.------------.' +
1523 'd.e------------.' +
1524 'k--------------.' +
1525 'l--------------.' +
1526 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001527 ]
1528
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001529 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001530 # Try a few one-shot test cases.
1531 for input, eof, output in self.test_cases:
1532 d = StatefulIncrementalDecoder()
1533 self.assertEquals(d.decode(input, eof), output)
1534
1535 # Also test an unfinished decode, followed by forcing EOF.
1536 d = StatefulIncrementalDecoder()
1537 self.assertEquals(d.decode(b'oiabcd'), '')
1538 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001539
1540class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001541
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001542 def setUp(self):
1543 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1544 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001545 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001546
Guido van Rossumd0712812007-04-11 16:32:43 +00001547 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001548 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001550 def test_constructor(self):
1551 r = self.BytesIO(b"\xc3\xa9\n\n")
1552 b = self.BufferedReader(r, 1000)
1553 t = self.TextIOWrapper(b)
1554 t.__init__(b, encoding="latin1", newline="\r\n")
1555 self.assertEquals(t.encoding, "latin1")
1556 self.assertEquals(t.line_buffering, False)
1557 t.__init__(b, encoding="utf8", line_buffering=True)
1558 self.assertEquals(t.encoding, "utf8")
1559 self.assertEquals(t.line_buffering, True)
1560 self.assertEquals("\xe9\n", t.readline())
1561 self.assertRaises(TypeError, t.__init__, b, newline=42)
1562 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1563
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001564 def test_detach(self):
1565 r = self.BytesIO()
1566 b = self.BufferedWriter(r)
1567 t = self.TextIOWrapper(b)
1568 self.assertIs(t.detach(), b)
1569
1570 t = self.TextIOWrapper(b, encoding="ascii")
1571 t.write("howdy")
1572 self.assertFalse(r.getvalue())
1573 t.detach()
1574 self.assertEqual(r.getvalue(), b"howdy")
1575 self.assertRaises(ValueError, t.detach)
1576
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001577 def test_repr(self):
1578 raw = self.BytesIO("hello".encode("utf-8"))
1579 b = self.BufferedReader(raw)
1580 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001581 modname = self.TextIOWrapper.__module__
1582 self.assertEqual(repr(t),
1583 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1584 raw.name = "dummy"
1585 self.assertEqual(repr(t),
1586 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1587 raw.name = b"dummy"
1588 self.assertEqual(repr(t),
1589 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001590
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 def test_line_buffering(self):
1592 r = self.BytesIO()
1593 b = self.BufferedWriter(r, 1000)
1594 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001595 t.write("X")
1596 self.assertEquals(r.getvalue(), b"") # No flush happened
1597 t.write("Y\nZ")
1598 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1599 t.write("A\rB")
1600 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1601
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001602 def test_encoding(self):
1603 # Check the encoding attribute is always set, and valid
1604 b = self.BytesIO()
1605 t = self.TextIOWrapper(b, encoding="utf8")
1606 self.assertEqual(t.encoding, "utf8")
1607 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001608 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 codecs.lookup(t.encoding)
1610
1611 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001612 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613 b = self.BytesIO(b"abc\n\xff\n")
1614 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001615 self.assertRaises(UnicodeError, t.read)
1616 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 b = self.BytesIO(b"abc\n\xff\n")
1618 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001619 self.assertRaises(UnicodeError, t.read)
1620 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 b = self.BytesIO(b"abc\n\xff\n")
1622 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001623 self.assertEquals(t.read(), "abc\n\n")
1624 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001625 b = self.BytesIO(b"abc\n\xff\n")
1626 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001627 self.assertEquals(t.read(), "abc\n\ufffd\n")
1628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001630 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 b = self.BytesIO()
1632 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001633 self.assertRaises(UnicodeError, t.write, "\xff")
1634 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 b = self.BytesIO()
1636 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001637 self.assertRaises(UnicodeError, t.write, "\xff")
1638 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639 b = self.BytesIO()
1640 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001641 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001642 t.write("abc\xffdef\n")
1643 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001644 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001645 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001646 b = self.BytesIO()
1647 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001648 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001649 t.write("abc\xffdef\n")
1650 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001651 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001654 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1655
1656 tests = [
1657 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001658 [ '', input_lines ],
1659 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1660 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1661 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001662 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001663 encodings = (
1664 'utf-8', 'latin-1',
1665 'utf-16', 'utf-16-le', 'utf-16-be',
1666 'utf-32', 'utf-32-le', 'utf-32-be',
1667 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001668
Guido van Rossum8358db22007-08-18 21:39:55 +00001669 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001670 # character in TextIOWrapper._pending_line.
1671 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001672 # XXX: str.encode() should return bytes
1673 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001674 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001675 for bufsize in range(1, 10):
1676 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1678 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001679 encoding=encoding)
1680 if do_reads:
1681 got_lines = []
1682 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001683 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001684 if c2 == '':
1685 break
1686 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001687 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001688 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001689 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001690
1691 for got_line, exp_line in zip(got_lines, exp_lines):
1692 self.assertEquals(got_line, exp_line)
1693 self.assertEquals(len(got_lines), len(exp_lines))
1694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 def test_newlines_input(self):
1696 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001697 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1698 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001699 (None, normalized.decode("ascii").splitlines(True)),
1700 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1702 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1703 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001704 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 buf = self.BytesIO(testdata)
1706 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001707 self.assertEquals(txt.readlines(), expected)
1708 txt.seek(0)
1709 self.assertEquals(txt.read(), "".join(expected))
1710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 def test_newlines_output(self):
1712 testdict = {
1713 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1714 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1715 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1716 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1717 }
1718 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1719 for newline, expected in tests:
1720 buf = self.BytesIO()
1721 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1722 txt.write("AAA\nB")
1723 txt.write("BB\nCCC\n")
1724 txt.write("X\rY\r\nZ")
1725 txt.flush()
1726 self.assertEquals(buf.closed, False)
1727 self.assertEquals(buf.getvalue(), expected)
1728
1729 def test_destructor(self):
1730 l = []
1731 base = self.BytesIO
1732 class MyBytesIO(base):
1733 def close(self):
1734 l.append(self.getvalue())
1735 base.close(self)
1736 b = MyBytesIO()
1737 t = self.TextIOWrapper(b, encoding="ascii")
1738 t.write("abc")
1739 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001740 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741 self.assertEquals([b"abc"], l)
1742
1743 def test_override_destructor(self):
1744 record = []
1745 class MyTextIO(self.TextIOWrapper):
1746 def __del__(self):
1747 record.append(1)
1748 try:
1749 f = super().__del__
1750 except AttributeError:
1751 pass
1752 else:
1753 f()
1754 def close(self):
1755 record.append(2)
1756 super().close()
1757 def flush(self):
1758 record.append(3)
1759 super().flush()
1760 b = self.BytesIO()
1761 t = MyTextIO(b, encoding="ascii")
1762 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001763 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 self.assertEqual(record, [1, 2, 3])
1765
1766 def test_error_through_destructor(self):
1767 # Test that the exception state is not modified by a destructor,
1768 # even if close() fails.
1769 rawio = self.CloseFailureIO()
1770 def f():
1771 self.TextIOWrapper(rawio).xyzzy
1772 with support.captured_output("stderr") as s:
1773 self.assertRaises(AttributeError, f)
1774 s = s.getvalue().strip()
1775 if s:
1776 # The destructor *may* have printed an unraisable error, check it
1777 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001778 self.assertTrue(s.startswith("Exception IOError: "), s)
1779 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001780
Guido van Rossum9b76da62007-04-11 01:09:03 +00001781 # Systematic tests of the text I/O API
1782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001783 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001784 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1785 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001786 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001787 f._CHUNK_SIZE = chunksize
1788 self.assertEquals(f.write("abc"), 3)
1789 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001791 f._CHUNK_SIZE = chunksize
1792 self.assertEquals(f.tell(), 0)
1793 self.assertEquals(f.read(), "abc")
1794 cookie = f.tell()
1795 self.assertEquals(f.seek(0), 0)
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001796 self.assertEquals(f.read(None), "abc")
1797 f.seek(0)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001798 self.assertEquals(f.read(2), "ab")
1799 self.assertEquals(f.read(1), "c")
1800 self.assertEquals(f.read(1), "")
1801 self.assertEquals(f.read(), "")
1802 self.assertEquals(f.tell(), cookie)
1803 self.assertEquals(f.seek(0), 0)
1804 self.assertEquals(f.seek(0, 2), cookie)
1805 self.assertEquals(f.write("def"), 3)
1806 self.assertEquals(f.seek(cookie), cookie)
1807 self.assertEquals(f.read(), "def")
1808 if enc.startswith("utf"):
1809 self.multi_line_test(f, enc)
1810 f.close()
1811
1812 def multi_line_test(self, f, enc):
1813 f.seek(0)
1814 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001815 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001816 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001817 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001818 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001819 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001820 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001821 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001822 wlines.append((f.tell(), line))
1823 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001824 f.seek(0)
1825 rlines = []
1826 while True:
1827 pos = f.tell()
1828 line = f.readline()
1829 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001830 break
1831 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001832 self.assertEquals(rlines, wlines)
1833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001834 def test_telling(self):
1835 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001836 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001837 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001838 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001839 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001840 p2 = f.tell()
1841 f.seek(0)
1842 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001843 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001844 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001845 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001846 self.assertEquals(f.tell(), p2)
1847 f.seek(0)
1848 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001849 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001850 self.assertRaises(IOError, f.tell)
1851 self.assertEquals(f.tell(), p2)
1852 f.close()
1853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001854 def test_seeking(self):
1855 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001856 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001857 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001858 prefix = bytes(u_prefix.encode("utf-8"))
1859 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001860 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001861 suffix = bytes(u_suffix.encode("utf-8"))
1862 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001864 f.write(line*2)
1865 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001867 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001868 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001869 self.assertEquals(f.tell(), prefix_size)
1870 self.assertEquals(f.readline(), u_suffix)
1871
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001873 # Regression test for a specific bug
1874 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001876 f.write(data)
1877 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001879 f._CHUNK_SIZE # Just test that it exists
1880 f._CHUNK_SIZE = 2
1881 f.readline()
1882 f.tell()
1883
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 def test_seek_and_tell(self):
1885 #Test seek/tell using the StatefulIncrementalDecoder.
1886 # Make test faster by doing smaller seeks
1887 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001888
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001889 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001890 """Tell/seek to various points within a data stream and ensure
1891 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001892 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001893 f.write(data)
1894 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001895 f = self.open(support.TESTFN, encoding='test_decoder')
1896 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001897 decoded = f.read()
1898 f.close()
1899
Neal Norwitze2b07052008-03-18 19:52:05 +00001900 for i in range(min_pos, len(decoded) + 1): # seek positions
1901 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001902 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001903 self.assertEquals(f.read(i), decoded[:i])
1904 cookie = f.tell()
1905 self.assertEquals(f.read(j), decoded[i:i + j])
1906 f.seek(cookie)
1907 self.assertEquals(f.read(), decoded[i:])
1908 f.close()
1909
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001910 # Enable the test decoder.
1911 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001912
1913 # Run the tests.
1914 try:
1915 # Try each test case.
1916 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001917 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001918
1919 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001920 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1921 offset = CHUNK_SIZE - len(input)//2
1922 prefix = b'.'*offset
1923 # Don't bother seeking into the prefix (takes too long).
1924 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001925 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001926
1927 # Ensure our test decoder won't interfere with subsequent tests.
1928 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001929 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001931 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001932 data = "1234567890"
1933 tests = ("utf-16",
1934 "utf-16-le",
1935 "utf-16-be",
1936 "utf-32",
1937 "utf-32-le",
1938 "utf-32-be")
1939 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001940 buf = self.BytesIO()
1941 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001942 # Check if the BOM is written only once (see issue1753).
1943 f.write(data)
1944 f.write(data)
1945 f.seek(0)
1946 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001947 f.seek(0)
1948 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001949 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1950
Benjamin Petersona1b49012009-03-31 23:11:32 +00001951 def test_unreadable(self):
1952 class UnReadable(self.BytesIO):
1953 def readable(self):
1954 return False
1955 txt = self.TextIOWrapper(UnReadable())
1956 self.assertRaises(IOError, txt.read)
1957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958 def test_read_one_by_one(self):
1959 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001960 reads = ""
1961 while True:
1962 c = txt.read(1)
1963 if not c:
1964 break
1965 reads += c
1966 self.assertEquals(reads, "AA\nBB")
1967
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001968 def test_readlines(self):
1969 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
1970 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
1971 txt.seek(0)
1972 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
1973 txt.seek(0)
1974 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
1975
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001976 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001977 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001978 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001979 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001980 reads = ""
1981 while True:
1982 c = txt.read(128)
1983 if not c:
1984 break
1985 reads += c
1986 self.assertEquals(reads, "A"*127+"\nB")
1987
1988 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001990
1991 # read one char at a time
1992 reads = ""
1993 while True:
1994 c = txt.read(1)
1995 if not c:
1996 break
1997 reads += c
1998 self.assertEquals(reads, self.normalized)
1999
2000 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002001 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002002 txt._CHUNK_SIZE = 4
2003
2004 reads = ""
2005 while True:
2006 c = txt.read(4)
2007 if not c:
2008 break
2009 reads += c
2010 self.assertEquals(reads, self.normalized)
2011
2012 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002013 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002014 txt._CHUNK_SIZE = 4
2015
2016 reads = txt.read(4)
2017 reads += txt.read(4)
2018 reads += txt.readline()
2019 reads += txt.readline()
2020 reads += txt.readline()
2021 self.assertEquals(reads, self.normalized)
2022
2023 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002024 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002025 txt._CHUNK_SIZE = 4
2026
2027 reads = txt.read(4)
2028 reads += txt.read()
2029 self.assertEquals(reads, self.normalized)
2030
2031 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002032 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002033 txt._CHUNK_SIZE = 4
2034
2035 reads = txt.read(4)
2036 pos = txt.tell()
2037 txt.seek(0)
2038 txt.seek(pos)
2039 self.assertEquals(txt.read(4), "BBB\n")
2040
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002041 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002042 buffer = self.BytesIO(self.testdata)
2043 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002044
2045 self.assertEqual(buffer.seekable(), txt.seekable())
2046
Antoine Pitroue4501852009-05-14 18:55:55 +00002047 def test_append_bom(self):
2048 # The BOM is not written again when appending to a non-empty file
2049 filename = support.TESTFN
2050 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2051 with self.open(filename, 'w', encoding=charset) as f:
2052 f.write('aaa')
2053 pos = f.tell()
2054 with self.open(filename, 'rb') as f:
2055 self.assertEquals(f.read(), 'aaa'.encode(charset))
2056
2057 with self.open(filename, 'a', encoding=charset) as f:
2058 f.write('xxx')
2059 with self.open(filename, 'rb') as f:
2060 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2061
2062 def test_seek_bom(self):
2063 # Same test, but when seeking manually
2064 filename = support.TESTFN
2065 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2066 with self.open(filename, 'w', encoding=charset) as f:
2067 f.write('aaa')
2068 pos = f.tell()
2069 with self.open(filename, 'r+', encoding=charset) as f:
2070 f.seek(pos)
2071 f.write('zzz')
2072 f.seek(0)
2073 f.write('bbb')
2074 with self.open(filename, 'rb') as f:
2075 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2076
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002077 def test_errors_property(self):
2078 with self.open(support.TESTFN, "w") as f:
2079 self.assertEqual(f.errors, "strict")
2080 with self.open(support.TESTFN, "w", errors="replace") as f:
2081 self.assertEqual(f.errors, "replace")
2082
Antoine Pitroue4501852009-05-14 18:55:55 +00002083
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00002084 def test_threads_write(self):
2085 # Issue6750: concurrent writes could duplicate data
2086 event = threading.Event()
2087 with self.open(support.TESTFN, "w", buffering=1) as f:
2088 def run(n):
2089 text = "Thread%03d\n" % n
2090 event.wait()
2091 f.write(text)
2092 threads = [threading.Thread(target=lambda n=x: run(n))
2093 for x in range(20)]
2094 for t in threads:
2095 t.start()
2096 time.sleep(0.02)
2097 event.set()
2098 for t in threads:
2099 t.join()
2100 with self.open(support.TESTFN) as f:
2101 content = f.read()
2102 for n in range(20):
2103 self.assertEquals(content.count("Thread%03d\n" % n), 1)
2104
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002105class CTextIOWrapperTest(TextIOWrapperTest):
2106
2107 def test_initialization(self):
2108 r = self.BytesIO(b"\xc3\xa9\n\n")
2109 b = self.BufferedReader(r, 1000)
2110 t = self.TextIOWrapper(b)
2111 self.assertRaises(TypeError, t.__init__, b, newline=42)
2112 self.assertRaises(ValueError, t.read)
2113 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2114 self.assertRaises(ValueError, t.read)
2115
2116 def test_garbage_collection(self):
2117 # C TextIOWrapper objects are collected, and collecting them flushes
2118 # all data to disk.
2119 # The Python version has __del__, so it ends in gc.garbage instead.
2120 rawio = io.FileIO(support.TESTFN, "wb")
2121 b = self.BufferedWriter(rawio)
2122 t = self.TextIOWrapper(b, encoding="ascii")
2123 t.write("456def")
2124 t.x = t
2125 wr = weakref.ref(t)
2126 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002127 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002128 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002129 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002130 self.assertEqual(f.read(), b"456def")
2131
2132class PyTextIOWrapperTest(TextIOWrapperTest):
2133 pass
2134
2135
2136class IncrementalNewlineDecoderTest(unittest.TestCase):
2137
2138 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002139 # UTF-8 specific tests for a newline decoder
2140 def _check_decode(b, s, **kwargs):
2141 # We exercise getstate() / setstate() as well as decode()
2142 state = decoder.getstate()
2143 self.assertEquals(decoder.decode(b, **kwargs), s)
2144 decoder.setstate(state)
2145 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002146
Antoine Pitrou180a3362008-12-14 16:36:46 +00002147 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002148
Antoine Pitrou180a3362008-12-14 16:36:46 +00002149 _check_decode(b'\xe8', "")
2150 _check_decode(b'\xa2', "")
2151 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002152
Antoine Pitrou180a3362008-12-14 16:36:46 +00002153 _check_decode(b'\xe8', "")
2154 _check_decode(b'\xa2', "")
2155 _check_decode(b'\x88', "\u8888")
2156
2157 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002158 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2159
Antoine Pitrou180a3362008-12-14 16:36:46 +00002160 decoder.reset()
2161 _check_decode(b'\n', "\n")
2162 _check_decode(b'\r', "")
2163 _check_decode(b'', "\n", final=True)
2164 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002165
Antoine Pitrou180a3362008-12-14 16:36:46 +00002166 _check_decode(b'\r', "")
2167 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002168
Antoine Pitrou180a3362008-12-14 16:36:46 +00002169 _check_decode(b'\r\r\n', "\n\n")
2170 _check_decode(b'\r', "")
2171 _check_decode(b'\r', "\n")
2172 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002173
Antoine Pitrou180a3362008-12-14 16:36:46 +00002174 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2175 _check_decode(b'\xe8\xa2\x88', "\u8888")
2176 _check_decode(b'\n', "\n")
2177 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2178 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002181 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 if encoding is not None:
2183 encoder = codecs.getincrementalencoder(encoding)()
2184 def _decode_bytewise(s):
2185 # Decode one byte at a time
2186 for b in encoder.encode(s):
2187 result.append(decoder.decode(bytes([b])))
2188 else:
2189 encoder = None
2190 def _decode_bytewise(s):
2191 # Decode one char at a time
2192 for c in s:
2193 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002194 self.assertEquals(decoder.newlines, None)
2195 _decode_bytewise("abc\n\r")
2196 self.assertEquals(decoder.newlines, '\n')
2197 _decode_bytewise("\nabc")
2198 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2199 _decode_bytewise("abc\r")
2200 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2201 _decode_bytewise("abc")
2202 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2203 _decode_bytewise("abc\r")
2204 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2205 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206 input = "abc"
2207 if encoder is not None:
2208 encoder.reset()
2209 input = encoder.encode(input)
2210 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002211 self.assertEquals(decoder.newlines, None)
2212
2213 def test_newline_decoder(self):
2214 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 # None meaning the IncrementalNewlineDecoder takes unicode input
2216 # rather than bytes input
2217 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002218 'utf-16', 'utf-16-le', 'utf-16-be',
2219 'utf-32', 'utf-32-le', 'utf-32-be',
2220 )
2221 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002222 decoder = enc and codecs.getincrementaldecoder(enc)()
2223 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2224 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002225 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002226 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2227 self.check_newline_decoding_utf8(decoder)
2228
Antoine Pitrou66913e22009-03-06 23:40:56 +00002229 def test_newline_bytes(self):
2230 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2231 def _check(dec):
2232 self.assertEquals(dec.newlines, None)
2233 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2234 self.assertEquals(dec.newlines, None)
2235 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2236 self.assertEquals(dec.newlines, None)
2237 dec = self.IncrementalNewlineDecoder(None, translate=False)
2238 _check(dec)
2239 dec = self.IncrementalNewlineDecoder(None, translate=True)
2240 _check(dec)
2241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002242class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2243 pass
2244
2245class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2246 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002247
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002248
Guido van Rossum01a27522007-03-07 01:00:12 +00002249# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002250
Guido van Rossum5abbf752007-08-27 17:39:33 +00002251class MiscIOTest(unittest.TestCase):
2252
Barry Warsaw40e82462008-11-20 20:14:50 +00002253 def tearDown(self):
2254 support.unlink(support.TESTFN)
2255
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002256 def test___all__(self):
2257 for name in self.io.__all__:
2258 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002259 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002260 if name == "open":
2261 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002262 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002263 self.assertTrue(issubclass(obj, Exception), name)
2264 elif not name.startswith("SEEK_"):
2265 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002266
Barry Warsaw40e82462008-11-20 20:14:50 +00002267 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002268 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002269 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002270 f.close()
2271
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002272 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002273 self.assertEquals(f.name, support.TESTFN)
2274 self.assertEquals(f.buffer.name, support.TESTFN)
2275 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2276 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002277 self.assertEquals(f.buffer.mode, "rb")
2278 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002279 f.close()
2280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002282 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002283 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2284 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002285
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002286 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002287 self.assertEquals(g.mode, "wb")
2288 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002289 self.assertEquals(g.name, f.fileno())
2290 self.assertEquals(g.raw.name, f.fileno())
2291 f.close()
2292 g.close()
2293
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002294 def test_io_after_close(self):
2295 for kwargs in [
2296 {"mode": "w"},
2297 {"mode": "wb"},
2298 {"mode": "w", "buffering": 1},
2299 {"mode": "w", "buffering": 2},
2300 {"mode": "wb", "buffering": 0},
2301 {"mode": "r"},
2302 {"mode": "rb"},
2303 {"mode": "r", "buffering": 1},
2304 {"mode": "r", "buffering": 2},
2305 {"mode": "rb", "buffering": 0},
2306 {"mode": "w+"},
2307 {"mode": "w+b"},
2308 {"mode": "w+", "buffering": 1},
2309 {"mode": "w+", "buffering": 2},
2310 {"mode": "w+b", "buffering": 0},
2311 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002312 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002313 f.close()
2314 self.assertRaises(ValueError, f.flush)
2315 self.assertRaises(ValueError, f.fileno)
2316 self.assertRaises(ValueError, f.isatty)
2317 self.assertRaises(ValueError, f.__iter__)
2318 if hasattr(f, "peek"):
2319 self.assertRaises(ValueError, f.peek, 1)
2320 self.assertRaises(ValueError, f.read)
2321 if hasattr(f, "read1"):
2322 self.assertRaises(ValueError, f.read1, 1024)
2323 if hasattr(f, "readinto"):
2324 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2325 self.assertRaises(ValueError, f.readline)
2326 self.assertRaises(ValueError, f.readlines)
2327 self.assertRaises(ValueError, f.seek, 0)
2328 self.assertRaises(ValueError, f.tell)
2329 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002330 self.assertRaises(ValueError, f.write,
2331 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002332 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002333 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002334
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002335 def test_blockingioerror(self):
2336 # Various BlockingIOError issues
2337 self.assertRaises(TypeError, self.BlockingIOError)
2338 self.assertRaises(TypeError, self.BlockingIOError, 1)
2339 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2340 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2341 b = self.BlockingIOError(1, "")
2342 self.assertEqual(b.characters_written, 0)
2343 class C(str):
2344 pass
2345 c = C("")
2346 b = self.BlockingIOError(1, c)
2347 c.b = b
2348 b.c = c
2349 wr = weakref.ref(c)
2350 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002351 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002352 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002353
2354 def test_abcs(self):
2355 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00002356 self.assertIsInstance(self.IOBase, abc.ABCMeta)
2357 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2358 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2359 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002360
2361 def _check_abc_inheritance(self, abcmodule):
2362 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002363 self.assertIsInstance(f, abcmodule.IOBase)
2364 self.assertIsInstance(f, abcmodule.RawIOBase)
2365 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2366 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002367 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002368 self.assertIsInstance(f, abcmodule.IOBase)
2369 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2370 self.assertIsInstance(f, abcmodule.BufferedIOBase)
2371 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00002373 self.assertIsInstance(f, abcmodule.IOBase)
2374 self.assertNotIsInstance(f, abcmodule.RawIOBase)
2375 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2376 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377
2378 def test_abc_inheritance(self):
2379 # Test implementations inherit from their respective ABCs
2380 self._check_abc_inheritance(self)
2381
2382 def test_abc_inheritance_official(self):
2383 # Test implementations inherit from the official ABCs of the
2384 # baseline "io" module.
2385 self._check_abc_inheritance(io)
2386
2387class CMiscIOTest(MiscIOTest):
2388 io = io
2389
2390class PyMiscIOTest(MiscIOTest):
2391 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002392
Guido van Rossum28524c72007-02-27 05:47:44 +00002393def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394 tests = (CIOTest, PyIOTest,
2395 CBufferedReaderTest, PyBufferedReaderTest,
2396 CBufferedWriterTest, PyBufferedWriterTest,
2397 CBufferedRWPairTest, PyBufferedRWPairTest,
2398 CBufferedRandomTest, PyBufferedRandomTest,
2399 StatefulIncrementalDecoderTest,
2400 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2401 CTextIOWrapperTest, PyTextIOWrapperTest,
2402 CMiscIOTest, PyMiscIOTest,)
2403
2404 # Put the namespaces of the IO module we are testing and some useful mock
2405 # classes in the __dict__ of each test.
2406 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2407 MockNonBlockWriterIO)
2408 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2409 c_io_ns = {name : getattr(io, name) for name in all_members}
2410 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2411 globs = globals()
2412 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2413 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2414 # Avoid turning open into a bound method.
2415 py_io_ns["open"] = pyio.OpenWrapper
2416 for test in tests:
2417 if test.__name__.startswith("C"):
2418 for name, obj in c_io_ns.items():
2419 setattr(test, name, obj)
2420 elif test.__name__.startswith("Py"):
2421 for name, obj in py_io_ns.items():
2422 setattr(test, name, obj)
2423
2424 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002425
2426if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427 test_main()