blob: 1f1cba299994913ee428fe7b29937ccb32a4316e [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000293 def test_invalid_operations(self):
294 # Try writing on a file opened in read mode and vice-versa.
295 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000296 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000297 self.assertRaises(IOError, fp.read)
298 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000299 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000300 self.assertRaises(IOError, fp.write, b"blah")
301 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000302 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000303 self.assertRaises(IOError, fp.write, "blah")
304 self.assertRaises(IOError, fp.writelines, ["blah\n"])
305
Guido van Rossum28524c72007-02-27 05:47:44 +0000306 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000307 with self.open(support.TESTFN, "wb", buffering=0) as f:
308 self.assertEqual(f.readable(), False)
309 self.assertEqual(f.writable(), True)
310 self.assertEqual(f.seekable(), True)
311 self.write_ops(f)
312 with self.open(support.TESTFN, "rb", buffering=0) as f:
313 self.assertEqual(f.readable(), True)
314 self.assertEqual(f.writable(), False)
315 self.assertEqual(f.seekable(), True)
316 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000317
Guido van Rossum87429772007-04-10 21:06:59 +0000318 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000319 with self.open(support.TESTFN, "wb") as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb") as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000329
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000331 with self.open(support.TESTFN, "wb") as f:
332 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
333 with self.open(support.TESTFN, "rb") as f:
334 self.assertEqual(f.readline(), b"abc\n")
335 self.assertEqual(f.readline(10), b"def\n")
336 self.assertEqual(f.readline(2), b"xy")
337 self.assertEqual(f.readline(4), b"zzy\n")
338 self.assertEqual(f.readline(), b"foo\x00bar\n")
339 self.assertEqual(f.readline(), b"another line")
340 self.assertRaises(TypeError, f.readline, 5.3)
341 with self.open(support.TESTFN, "r") as f:
342 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000343
Guido van Rossum28524c72007-02-27 05:47:44 +0000344 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000345 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000346 self.write_ops(f)
347 data = f.getvalue()
348 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000349 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum53807da2007-04-10 19:01:47 +0000352 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 # On Windows and Mac OSX this test comsumes large resources; It takes
354 # a long time to build the >2GB file and takes >2GB of disk space
355 # therefore the resource must be enabled to run this test.
356 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000357 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000358 print("\nTesting large file ops skipped on %s." % sys.platform,
359 file=sys.stderr)
360 print("It requires %d bytes and a long time." % self.LARGE,
361 file=sys.stderr)
362 print("Use 'regrtest.py -u largefile test_io' to run it.",
363 file=sys.stderr)
364 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000365 with self.open(support.TESTFN, "w+b", 0) as f:
366 self.large_file_ops(f)
367 with self.open(support.TESTFN, "w+b") as f:
368 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000369
370 def test_with_open(self):
371 for bufsize in (0, 1, 100):
372 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000373 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000374 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.closed, True)
376 f = None
377 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000379 1/0
380 except ZeroDivisionError:
381 self.assertEqual(f.closed, True)
382 else:
383 self.fail("1/0 didn't raise an exception")
384
Antoine Pitrou08838b62009-01-21 00:55:13 +0000385 # issue 5008
386 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000388 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000389 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000390 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000394 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395
Guido van Rossum87429772007-04-10 21:06:59 +0000396 def test_destructor(self):
397 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000399 def __del__(self):
400 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 try:
402 f = super().__del__
403 except AttributeError:
404 pass
405 else:
406 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000407 def close(self):
408 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000410 def flush(self):
411 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 super().flush()
413 f = MyFileIO(support.TESTFN, "wb")
414 f.write(b"xxx")
415 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000416 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000418 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000419 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420
421 def _check_base_destructor(self, base):
422 record = []
423 class MyIO(base):
424 def __init__(self):
425 # This exercises the availability of attributes on object
426 # destruction.
427 # (in the C version, close() is called by the tp_dealloc
428 # function, not by __del__)
429 self.on_del = 1
430 self.on_close = 2
431 self.on_flush = 3
432 def __del__(self):
433 record.append(self.on_del)
434 try:
435 f = super().__del__
436 except AttributeError:
437 pass
438 else:
439 f()
440 def close(self):
441 record.append(self.on_close)
442 super().close()
443 def flush(self):
444 record.append(self.on_flush)
445 super().flush()
446 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000447 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000448 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000449 self.assertEqual(record, [1, 2, 3])
450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 def test_IOBase_destructor(self):
452 self._check_base_destructor(self.IOBase)
453
454 def test_RawIOBase_destructor(self):
455 self._check_base_destructor(self.RawIOBase)
456
457 def test_BufferedIOBase_destructor(self):
458 self._check_base_destructor(self.BufferedIOBase)
459
460 def test_TextIOBase_destructor(self):
461 self._check_base_destructor(self.TextIOBase)
462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000464 with self.open(support.TESTFN, "wb") as f:
465 f.write(b"xxx")
466 with self.open(support.TESTFN, "rb") as f:
467 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000468
Guido van Rossumd4103952007-04-12 05:44:49 +0000469 def test_array_writes(self):
470 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000471 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000472 with self.open(support.TESTFN, "wb", 0) as f:
473 self.assertEqual(f.write(a), n)
474 with self.open(support.TESTFN, "wb") as f:
475 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000476
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000477 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000479 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481 def test_read_closed(self):
482 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000483 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 with self.open(support.TESTFN, "r") as f:
485 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(file.read(), "egg\n")
487 file.seek(0)
488 file.close()
489 self.assertRaises(ValueError, file.read)
490
491 def test_no_closefd_with_filename(self):
492 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000494
495 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000496 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000497 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000499 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000501 self.assertEqual(file.buffer.raw.closefd, False)
502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_garbage_collection(self):
504 # FileIO objects are collected, and collecting them flushes
505 # all data to disk.
506 f = self.FileIO(support.TESTFN, "wb")
507 f.write(b"abcxxx")
508 f.f = f
509 wr = weakref.ref(f)
510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000512 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000513 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000515
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000516 def test_unbounded_file(self):
517 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
518 zero = "/dev/zero"
519 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000520 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000521 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000522 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000523 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000524 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000525 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000526 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000527 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000529 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 self.assertRaises(OverflowError, f.read)
531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532class CIOTest(IOTest):
533 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535class PyIOTest(IOTest):
536 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000537
Guido van Rossuma9e20242007-03-08 00:43:48 +0000538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539class CommonBufferedTests:
540 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
541
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000542 def test_detach(self):
543 raw = self.MockRawIO()
544 buf = self.tp(raw)
545 self.assertIs(buf.detach(), raw)
546 self.assertRaises(ValueError, buf.detach)
547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_fileno(self):
549 rawio = self.MockRawIO()
550 bufio = self.tp(rawio)
551
552 self.assertEquals(42, bufio.fileno())
553
554 def test_no_fileno(self):
555 # XXX will we always have fileno() function? If so, kill
556 # this test. Else, write it.
557 pass
558
559 def test_invalid_args(self):
560 rawio = self.MockRawIO()
561 bufio = self.tp(rawio)
562 # Invalid whence
563 self.assertRaises(ValueError, bufio.seek, 0, -1)
564 self.assertRaises(ValueError, bufio.seek, 0, 3)
565
566 def test_override_destructor(self):
567 tp = self.tp
568 record = []
569 class MyBufferedIO(tp):
570 def __del__(self):
571 record.append(1)
572 try:
573 f = super().__del__
574 except AttributeError:
575 pass
576 else:
577 f()
578 def close(self):
579 record.append(2)
580 super().close()
581 def flush(self):
582 record.append(3)
583 super().flush()
584 rawio = self.MockRawIO()
585 bufio = MyBufferedIO(rawio)
586 writable = bufio.writable()
587 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000588 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 if writable:
590 self.assertEqual(record, [1, 2, 3])
591 else:
592 self.assertEqual(record, [1, 2])
593
594 def test_context_manager(self):
595 # Test usability as a context manager
596 rawio = self.MockRawIO()
597 bufio = self.tp(rawio)
598 def _with():
599 with bufio:
600 pass
601 _with()
602 # bufio should now be closed, and using it a second time should raise
603 # a ValueError.
604 self.assertRaises(ValueError, _with)
605
606 def test_error_through_destructor(self):
607 # Test that the exception state is not modified by a destructor,
608 # even if close() fails.
609 rawio = self.CloseFailureIO()
610 def f():
611 self.tp(rawio).xyzzy
612 with support.captured_output("stderr") as s:
613 self.assertRaises(AttributeError, f)
614 s = s.getvalue().strip()
615 if s:
616 # The destructor *may* have printed an unraisable error, check it
617 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000618 self.assertTrue(s.startswith("Exception IOError: "), s)
619 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000620
Antoine Pitrou716c4442009-05-23 19:04:03 +0000621 def test_repr(self):
622 raw = self.MockRawIO()
623 b = self.tp(raw)
624 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
625 self.assertEqual(repr(b), "<%s>" % clsname)
626 raw.name = "dummy"
627 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
628 raw.name = b"dummy"
629 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
630
Guido van Rossum78892e42007-04-06 17:31:18 +0000631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000632class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
633 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 def test_constructor(self):
636 rawio = self.MockRawIO([b"abc"])
637 bufio = self.tp(rawio)
638 bufio.__init__(rawio)
639 bufio.__init__(rawio, buffer_size=1024)
640 bufio.__init__(rawio, buffer_size=16)
641 self.assertEquals(b"abc", bufio.read())
642 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
643 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
644 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
645 rawio = self.MockRawIO([b"abc"])
646 bufio.__init__(rawio)
647 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_read(self):
650 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
651 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000652 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 # Invalid args
654 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 def test_read1(self):
657 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
658 bufio = self.tp(rawio)
659 self.assertEquals(b"a", bufio.read(1))
660 self.assertEquals(b"b", bufio.read1(1))
661 self.assertEquals(rawio._reads, 1)
662 self.assertEquals(b"c", bufio.read1(100))
663 self.assertEquals(rawio._reads, 1)
664 self.assertEquals(b"d", bufio.read1(100))
665 self.assertEquals(rawio._reads, 2)
666 self.assertEquals(b"efg", bufio.read1(100))
667 self.assertEquals(rawio._reads, 3)
668 self.assertEquals(b"", bufio.read1(100))
669 # Invalid args
670 self.assertRaises(ValueError, bufio.read1, -1)
671
672 def test_readinto(self):
673 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
674 bufio = self.tp(rawio)
675 b = bytearray(2)
676 self.assertEquals(bufio.readinto(b), 2)
677 self.assertEquals(b, b"ab")
678 self.assertEquals(bufio.readinto(b), 2)
679 self.assertEquals(b, b"cd")
680 self.assertEquals(bufio.readinto(b), 2)
681 self.assertEquals(b, b"ef")
682 self.assertEquals(bufio.readinto(b), 1)
683 self.assertEquals(b, b"gf")
684 self.assertEquals(bufio.readinto(b), 0)
685 self.assertEquals(b, b"gf")
686
687 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000688 data = b"abcdefghi"
689 dlen = len(data)
690
691 tests = [
692 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
693 [ 100, [ 3, 3, 3], [ dlen ] ],
694 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
695 ]
696
697 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 rawio = self.MockFileIO(data)
699 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000700 pos = 0
701 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000702 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000703 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000705 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000708 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
710 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000711
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000712 self.assertEquals(b"abcd", bufio.read(6))
713 self.assertEquals(b"e", bufio.read(1))
714 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000716 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000717 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 def test_read_past_eof(self):
720 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
721 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000722
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000723 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 def test_read_all(self):
726 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
727 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000728
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000729 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000732 try:
733 # Write out many bytes with exactly the same number of 0's,
734 # 1's... 255's. This will help us check that concurrent reading
735 # doesn't duplicate or forget contents.
736 N = 1000
737 l = list(range(256)) * N
738 random.shuffle(l)
739 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000740 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000741 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000742 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000743 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000744 errors = []
745 results = []
746 def f():
747 try:
748 # Intra-buffer read then buffer-flushing read
749 for n in cycle([1, 19]):
750 s = bufio.read(n)
751 if not s:
752 break
753 # list.append() is atomic
754 results.append(s)
755 except Exception as e:
756 errors.append(e)
757 raise
758 threads = [threading.Thread(target=f) for x in range(20)]
759 for t in threads:
760 t.start()
761 time.sleep(0.02) # yield
762 for t in threads:
763 t.join()
764 self.assertFalse(errors,
765 "the following exceptions were caught: %r" % errors)
766 s = b''.join(results)
767 for i in range(256):
768 c = bytes(bytearray([i]))
769 self.assertEqual(s.count(c), N)
770 finally:
771 support.unlink(support.TESTFN)
772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000773 def test_misbehaved_io(self):
774 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
775 bufio = self.tp(rawio)
776 self.assertRaises(IOError, bufio.seek, 0)
777 self.assertRaises(IOError, bufio.tell)
778
779class CBufferedReaderTest(BufferedReaderTest):
780 tp = io.BufferedReader
781
782 def test_constructor(self):
783 BufferedReaderTest.test_constructor(self)
784 # The allocation can succeed on 32-bit builds, e.g. with more
785 # than 2GB RAM and a 64-bit kernel.
786 if sys.maxsize > 0x7FFFFFFF:
787 rawio = self.MockRawIO()
788 bufio = self.tp(rawio)
789 self.assertRaises((OverflowError, MemoryError, ValueError),
790 bufio.__init__, rawio, sys.maxsize)
791
792 def test_initialization(self):
793 rawio = self.MockRawIO([b"abc"])
794 bufio = self.tp(rawio)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
796 self.assertRaises(ValueError, bufio.read)
797 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
798 self.assertRaises(ValueError, bufio.read)
799 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
800 self.assertRaises(ValueError, bufio.read)
801
802 def test_misbehaved_io_read(self):
803 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
804 bufio = self.tp(rawio)
805 # _pyio.BufferedReader seems to implement reading different, so that
806 # checking this is not so easy.
807 self.assertRaises(IOError, bufio.read, 10)
808
809 def test_garbage_collection(self):
810 # C BufferedReader objects are collected.
811 # The Python version has __del__, so it ends into gc.garbage instead
812 rawio = self.FileIO(support.TESTFN, "w+b")
813 f = self.tp(rawio)
814 f.f = f
815 wr = weakref.ref(f)
816 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000817 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000818 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819
820class PyBufferedReaderTest(BufferedReaderTest):
821 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000822
Guido van Rossuma9e20242007-03-08 00:43:48 +0000823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000824class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
825 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 def test_constructor(self):
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio)
830 bufio.__init__(rawio)
831 bufio.__init__(rawio, buffer_size=1024)
832 bufio.__init__(rawio, buffer_size=16)
833 self.assertEquals(3, bufio.write(b"abc"))
834 bufio.flush()
835 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
836 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
837 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
838 bufio.__init__(rawio)
839 self.assertEquals(3, bufio.write(b"ghi"))
840 bufio.flush()
841 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
842
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000843 def test_detach_flush(self):
844 raw = self.MockRawIO()
845 buf = self.tp(raw)
846 buf.write(b"howdy!")
847 self.assertFalse(raw._write_stack)
848 buf.detach()
849 self.assertEqual(raw._write_stack, [b"howdy!"])
850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000852 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 writer = self.MockRawIO()
854 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000855 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000856 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_write_overflow(self):
859 writer = self.MockRawIO()
860 bufio = self.tp(writer, 8)
861 contents = b"abcdefghijklmnop"
862 for n in range(0, len(contents), 3):
863 bufio.write(contents[n:n+3])
864 flushed = b"".join(writer._write_stack)
865 # At least (total - 8) bytes were implicitly flushed, perhaps more
866 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000867 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def check_writes(self, intermediate_func):
870 # Lots of writes, test the flushed output is as expected.
871 contents = bytes(range(256)) * 1000
872 n = 0
873 writer = self.MockRawIO()
874 bufio = self.tp(writer, 13)
875 # Generator of write sizes: repeat each N 15 times then proceed to N+1
876 def gen_sizes():
877 for size in count(1):
878 for i in range(15):
879 yield size
880 sizes = gen_sizes()
881 while n < len(contents):
882 size = min(next(sizes), len(contents) - n)
883 self.assertEquals(bufio.write(contents[n:n+size]), size)
884 intermediate_func(bufio)
885 n += size
886 bufio.flush()
887 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_writes(self):
890 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 def test_writes_and_flushes(self):
893 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 def test_writes_and_seeks(self):
896 def _seekabs(bufio):
897 pos = bufio.tell()
898 bufio.seek(pos + 1, 0)
899 bufio.seek(pos - 1, 0)
900 bufio.seek(pos, 0)
901 self.check_writes(_seekabs)
902 def _seekrel(bufio):
903 pos = bufio.seek(0, 1)
904 bufio.seek(+1, 1)
905 bufio.seek(-1, 1)
906 bufio.seek(pos, 0)
907 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_writes_and_truncates(self):
910 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912 def test_write_non_blocking(self):
913 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000914 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 self.assertEquals(bufio.write(b"abcd"), 4)
917 self.assertEquals(bufio.write(b"efghi"), 5)
918 # 1 byte will be written, the rest will be buffered
919 raw.block_on(b"k")
920 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 # 8 bytes will be written, 8 will be buffered and the rest will be lost
923 raw.block_on(b"0")
924 try:
925 bufio.write(b"opqrwxyz0123456789")
926 except self.BlockingIOError as e:
927 written = e.characters_written
928 else:
929 self.fail("BlockingIOError should have been raised")
930 self.assertEquals(written, 16)
931 self.assertEquals(raw.pop_written(),
932 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000934 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
935 s = raw.pop_written()
936 # Previously buffered bytes were flushed
937 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def test_write_and_rewind(self):
940 raw = io.BytesIO()
941 bufio = self.tp(raw, 4)
942 self.assertEqual(bufio.write(b"abcdef"), 6)
943 self.assertEqual(bufio.tell(), 6)
944 bufio.seek(0, 0)
945 self.assertEqual(bufio.write(b"XY"), 2)
946 bufio.seek(6, 0)
947 self.assertEqual(raw.getvalue(), b"XYcdef")
948 self.assertEqual(bufio.write(b"123456"), 6)
949 bufio.flush()
950 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000952 def test_flush(self):
953 writer = self.MockRawIO()
954 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000955 bufio.write(b"abc")
956 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000957 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_destructor(self):
960 writer = self.MockRawIO()
961 bufio = self.tp(writer, 8)
962 bufio.write(b"abc")
963 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000964 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 self.assertEquals(b"abc", writer._write_stack[0])
966
967 def test_truncate(self):
968 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000969 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000970 bufio = self.tp(raw, 8)
971 bufio.write(b"abcdef")
972 self.assertEqual(bufio.truncate(3), 3)
973 self.assertEqual(bufio.tell(), 3)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000974 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 self.assertEqual(f.read(), b"abc")
976
977 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000978 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979 # Write out many bytes from many threads and test they were
980 # all flushed.
981 N = 1000
982 contents = bytes(range(256)) * N
983 sizes = cycle([1, 19])
984 n = 0
985 queue = deque()
986 while n < len(contents):
987 size = next(sizes)
988 queue.append(contents[n:n+size])
989 n += size
990 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000991 # We use a real file object because it allows us to
992 # exercise situations where the GIL is released before
993 # writing the buffer to the raw streams. This is in addition
994 # to concurrency issues due to switching threads in the middle
995 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000996 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000997 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000998 errors = []
999 def f():
1000 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 while True:
1002 try:
1003 s = queue.popleft()
1004 except IndexError:
1005 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001006 bufio.write(s)
1007 except Exception as e:
1008 errors.append(e)
1009 raise
1010 threads = [threading.Thread(target=f) for x in range(20)]
1011 for t in threads:
1012 t.start()
1013 time.sleep(0.02) # yield
1014 for t in threads:
1015 t.join()
1016 self.assertFalse(errors,
1017 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001019 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020 s = f.read()
1021 for i in range(256):
1022 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001023 finally:
1024 support.unlink(support.TESTFN)
1025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_misbehaved_io(self):
1027 rawio = self.MisbehavedRawIO()
1028 bufio = self.tp(rawio, 5)
1029 self.assertRaises(IOError, bufio.seek, 0)
1030 self.assertRaises(IOError, bufio.tell)
1031 self.assertRaises(IOError, bufio.write, b"abcdef")
1032
Benjamin Peterson59406a92009-03-26 17:10:29 +00001033 def test_max_buffer_size_deprecation(self):
1034 with support.check_warnings() as w:
1035 warnings.simplefilter("always", DeprecationWarning)
1036 self.tp(self.MockRawIO(), 8, 12)
1037 self.assertEqual(len(w.warnings), 1)
1038 warning = w.warnings[0]
1039 self.assertTrue(warning.category is DeprecationWarning)
1040 self.assertEqual(str(warning.message),
1041 "max_buffer_size is deprecated")
1042
1043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044class CBufferedWriterTest(BufferedWriterTest):
1045 tp = io.BufferedWriter
1046
1047 def test_constructor(self):
1048 BufferedWriterTest.test_constructor(self)
1049 # The allocation can succeed on 32-bit builds, e.g. with more
1050 # than 2GB RAM and a 64-bit kernel.
1051 if sys.maxsize > 0x7FFFFFFF:
1052 rawio = self.MockRawIO()
1053 bufio = self.tp(rawio)
1054 self.assertRaises((OverflowError, MemoryError, ValueError),
1055 bufio.__init__, rawio, sys.maxsize)
1056
1057 def test_initialization(self):
1058 rawio = self.MockRawIO()
1059 bufio = self.tp(rawio)
1060 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1061 self.assertRaises(ValueError, bufio.write, b"def")
1062 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1063 self.assertRaises(ValueError, bufio.write, b"def")
1064 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1065 self.assertRaises(ValueError, bufio.write, b"def")
1066
1067 def test_garbage_collection(self):
1068 # C BufferedWriter objects are collected, and collecting them flushes
1069 # all data to disk.
1070 # The Python version has __del__, so it ends into gc.garbage instead
1071 rawio = self.FileIO(support.TESTFN, "w+b")
1072 f = self.tp(rawio)
1073 f.write(b"123xxx")
1074 f.x = f
1075 wr = weakref.ref(f)
1076 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001077 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001078 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001079 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 self.assertEqual(f.read(), b"123xxx")
1081
1082
1083class PyBufferedWriterTest(BufferedWriterTest):
1084 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001085
Guido van Rossum01a27522007-03-07 01:00:12 +00001086class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001087
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001088 def test_constructor(self):
1089 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001090 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001091
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001092 def test_detach(self):
1093 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1094 self.assertRaises(self.UnsupportedOperation, pair.detach)
1095
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001096 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001097 with support.check_warnings() as w:
1098 warnings.simplefilter("always", DeprecationWarning)
1099 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1100 self.assertEqual(len(w.warnings), 1)
1101 warning = w.warnings[0]
1102 self.assertTrue(warning.category is DeprecationWarning)
1103 self.assertEqual(str(warning.message),
1104 "max_buffer_size is deprecated")
1105
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001106 def test_constructor_with_not_readable(self):
1107 class NotReadable(MockRawIO):
1108 def readable(self):
1109 return False
1110
1111 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1112
1113 def test_constructor_with_not_writeable(self):
1114 class NotWriteable(MockRawIO):
1115 def writable(self):
1116 return False
1117
1118 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1119
1120 def test_read(self):
1121 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1122
1123 self.assertEqual(pair.read(3), b"abc")
1124 self.assertEqual(pair.read(1), b"d")
1125 self.assertEqual(pair.read(), b"ef")
1126
1127 def test_read1(self):
1128 # .read1() is delegated to the underlying reader object, so this test
1129 # can be shallow.
1130 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1131
1132 self.assertEqual(pair.read1(3), b"abc")
1133
1134 def test_readinto(self):
1135 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1136
1137 data = bytearray(5)
1138 self.assertEqual(pair.readinto(data), 5)
1139 self.assertEqual(data, b"abcde")
1140
1141 def test_write(self):
1142 w = self.MockRawIO()
1143 pair = self.tp(self.MockRawIO(), w)
1144
1145 pair.write(b"abc")
1146 pair.flush()
1147 pair.write(b"def")
1148 pair.flush()
1149 self.assertEqual(w._write_stack, [b"abc", b"def"])
1150
1151 def test_peek(self):
1152 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1153
1154 self.assertTrue(pair.peek(3).startswith(b"abc"))
1155 self.assertEqual(pair.read(3), b"abc")
1156
1157 def test_readable(self):
1158 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1159 self.assertTrue(pair.readable())
1160
1161 def test_writeable(self):
1162 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1163 self.assertTrue(pair.writable())
1164
1165 def test_seekable(self):
1166 # BufferedRWPairs are never seekable, even if their readers and writers
1167 # are.
1168 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1169 self.assertFalse(pair.seekable())
1170
1171 # .flush() is delegated to the underlying writer object and has been
1172 # tested in the test_write method.
1173
1174 def test_close_and_closed(self):
1175 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1176 self.assertFalse(pair.closed)
1177 pair.close()
1178 self.assertTrue(pair.closed)
1179
1180 def test_isatty(self):
1181 class SelectableIsAtty(MockRawIO):
1182 def __init__(self, isatty):
1183 MockRawIO.__init__(self)
1184 self._isatty = isatty
1185
1186 def isatty(self):
1187 return self._isatty
1188
1189 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1190 self.assertFalse(pair.isatty())
1191
1192 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1193 self.assertTrue(pair.isatty())
1194
1195 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1196 self.assertTrue(pair.isatty())
1197
1198 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1199 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201class CBufferedRWPairTest(BufferedRWPairTest):
1202 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204class PyBufferedRWPairTest(BufferedRWPairTest):
1205 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207
1208class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1209 read_mode = "rb+"
1210 write_mode = "wb+"
1211
1212 def test_constructor(self):
1213 BufferedReaderTest.test_constructor(self)
1214 BufferedWriterTest.test_constructor(self)
1215
1216 def test_read_and_write(self):
1217 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001218 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001219
1220 self.assertEqual(b"as", rw.read(2))
1221 rw.write(b"ddd")
1222 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001223 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001225 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_seek_and_tell(self):
1228 raw = self.BytesIO(b"asdfghjkl")
1229 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001230
1231 self.assertEquals(b"as", rw.read(2))
1232 self.assertEquals(2, rw.tell())
1233 rw.seek(0, 0)
1234 self.assertEquals(b"asdf", rw.read(4))
1235
1236 rw.write(b"asdf")
1237 rw.seek(0, 0)
1238 self.assertEquals(b"asdfasdfl", rw.read())
1239 self.assertEquals(9, rw.tell())
1240 rw.seek(-4, 2)
1241 self.assertEquals(5, rw.tell())
1242 rw.seek(2, 1)
1243 self.assertEquals(7, rw.tell())
1244 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001245 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 def check_flush_and_read(self, read_func):
1248 raw = self.BytesIO(b"abcdefghi")
1249 bufio = self.tp(raw)
1250
1251 self.assertEquals(b"ab", read_func(bufio, 2))
1252 bufio.write(b"12")
1253 self.assertEquals(b"ef", read_func(bufio, 2))
1254 self.assertEquals(6, bufio.tell())
1255 bufio.flush()
1256 self.assertEquals(6, bufio.tell())
1257 self.assertEquals(b"ghi", read_func(bufio))
1258 raw.seek(0, 0)
1259 raw.write(b"XYZ")
1260 # flush() resets the read buffer
1261 bufio.flush()
1262 bufio.seek(0, 0)
1263 self.assertEquals(b"XYZ", read_func(bufio, 3))
1264
1265 def test_flush_and_read(self):
1266 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1267
1268 def test_flush_and_readinto(self):
1269 def _readinto(bufio, n=-1):
1270 b = bytearray(n if n >= 0 else 9999)
1271 n = bufio.readinto(b)
1272 return bytes(b[:n])
1273 self.check_flush_and_read(_readinto)
1274
1275 def test_flush_and_peek(self):
1276 def _peek(bufio, n=-1):
1277 # This relies on the fact that the buffer can contain the whole
1278 # raw stream, otherwise peek() can return less.
1279 b = bufio.peek(n)
1280 if n != -1:
1281 b = b[:n]
1282 bufio.seek(len(b), 1)
1283 return b
1284 self.check_flush_and_read(_peek)
1285
1286 def test_flush_and_write(self):
1287 raw = self.BytesIO(b"abcdefghi")
1288 bufio = self.tp(raw)
1289
1290 bufio.write(b"123")
1291 bufio.flush()
1292 bufio.write(b"45")
1293 bufio.flush()
1294 bufio.seek(0, 0)
1295 self.assertEquals(b"12345fghi", raw.getvalue())
1296 self.assertEquals(b"12345fghi", bufio.read())
1297
1298 def test_threads(self):
1299 BufferedReaderTest.test_threads(self)
1300 BufferedWriterTest.test_threads(self)
1301
1302 def test_writes_and_peek(self):
1303 def _peek(bufio):
1304 bufio.peek(1)
1305 self.check_writes(_peek)
1306 def _peek(bufio):
1307 pos = bufio.tell()
1308 bufio.seek(-1, 1)
1309 bufio.peek(1)
1310 bufio.seek(pos, 0)
1311 self.check_writes(_peek)
1312
1313 def test_writes_and_reads(self):
1314 def _read(bufio):
1315 bufio.seek(-1, 1)
1316 bufio.read(1)
1317 self.check_writes(_read)
1318
1319 def test_writes_and_read1s(self):
1320 def _read1(bufio):
1321 bufio.seek(-1, 1)
1322 bufio.read1(1)
1323 self.check_writes(_read1)
1324
1325 def test_writes_and_readintos(self):
1326 def _read(bufio):
1327 bufio.seek(-1, 1)
1328 bufio.readinto(bytearray(1))
1329 self.check_writes(_read)
1330
1331 def test_misbehaved_io(self):
1332 BufferedReaderTest.test_misbehaved_io(self)
1333 BufferedWriterTest.test_misbehaved_io(self)
1334
1335class CBufferedRandomTest(BufferedRandomTest):
1336 tp = io.BufferedRandom
1337
1338 def test_constructor(self):
1339 BufferedRandomTest.test_constructor(self)
1340 # The allocation can succeed on 32-bit builds, e.g. with more
1341 # than 2GB RAM and a 64-bit kernel.
1342 if sys.maxsize > 0x7FFFFFFF:
1343 rawio = self.MockRawIO()
1344 bufio = self.tp(rawio)
1345 self.assertRaises((OverflowError, MemoryError, ValueError),
1346 bufio.__init__, rawio, sys.maxsize)
1347
1348 def test_garbage_collection(self):
1349 CBufferedReaderTest.test_garbage_collection(self)
1350 CBufferedWriterTest.test_garbage_collection(self)
1351
1352class PyBufferedRandomTest(BufferedRandomTest):
1353 tp = pyio.BufferedRandom
1354
1355
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001356# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1357# properties:
1358# - A single output character can correspond to many bytes of input.
1359# - The number of input bytes to complete the character can be
1360# undetermined until the last input byte is received.
1361# - The number of input bytes can vary depending on previous input.
1362# - A single input byte can correspond to many characters of output.
1363# - The number of output characters can be undetermined until the
1364# last input byte is received.
1365# - The number of output characters can vary depending on previous input.
1366
1367class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1368 """
1369 For testing seek/tell behavior with a stateful, buffering decoder.
1370
1371 Input is a sequence of words. Words may be fixed-length (length set
1372 by input) or variable-length (period-terminated). In variable-length
1373 mode, extra periods are ignored. Possible words are:
1374 - 'i' followed by a number sets the input length, I (maximum 99).
1375 When I is set to 0, words are space-terminated.
1376 - 'o' followed by a number sets the output length, O (maximum 99).
1377 - Any other word is converted into a word followed by a period on
1378 the output. The output word consists of the input word truncated
1379 or padded out with hyphens to make its length equal to O. If O
1380 is 0, the word is output verbatim without truncating or padding.
1381 I and O are initially set to 1. When I changes, any buffered input is
1382 re-scanned according to the new I. EOF also terminates the last word.
1383 """
1384
1385 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001386 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001387 self.reset()
1388
1389 def __repr__(self):
1390 return '<SID %x>' % id(self)
1391
1392 def reset(self):
1393 self.i = 1
1394 self.o = 1
1395 self.buffer = bytearray()
1396
1397 def getstate(self):
1398 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1399 return bytes(self.buffer), i*100 + o
1400
1401 def setstate(self, state):
1402 buffer, io = state
1403 self.buffer = bytearray(buffer)
1404 i, o = divmod(io, 100)
1405 self.i, self.o = i ^ 1, o ^ 1
1406
1407 def decode(self, input, final=False):
1408 output = ''
1409 for b in input:
1410 if self.i == 0: # variable-length, terminated with period
1411 if b == ord('.'):
1412 if self.buffer:
1413 output += self.process_word()
1414 else:
1415 self.buffer.append(b)
1416 else: # fixed-length, terminate after self.i bytes
1417 self.buffer.append(b)
1418 if len(self.buffer) == self.i:
1419 output += self.process_word()
1420 if final and self.buffer: # EOF terminates the last word
1421 output += self.process_word()
1422 return output
1423
1424 def process_word(self):
1425 output = ''
1426 if self.buffer[0] == ord('i'):
1427 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1428 elif self.buffer[0] == ord('o'):
1429 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1430 else:
1431 output = self.buffer.decode('ascii')
1432 if len(output) < self.o:
1433 output += '-'*self.o # pad out with hyphens
1434 if self.o:
1435 output = output[:self.o] # truncate to output length
1436 output += '.'
1437 self.buffer = bytearray()
1438 return output
1439
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001440 codecEnabled = False
1441
1442 @classmethod
1443 def lookupTestDecoder(cls, name):
1444 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001445 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001446 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001447 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001448 incrementalencoder=None,
1449 streamreader=None, streamwriter=None,
1450 incrementaldecoder=cls)
1451
1452# Register the previous decoder for testing.
1453# Disabled by default, tests will enable it.
1454codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1455
1456
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001457class StatefulIncrementalDecoderTest(unittest.TestCase):
1458 """
1459 Make sure the StatefulIncrementalDecoder actually works.
1460 """
1461
1462 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001463 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001464 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001465 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001466 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001467 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001468 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001469 # I=0, O=6 (variable-length input, fixed-length output)
1470 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1471 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001472 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001473 # I=6, O=3 (fixed-length input > fixed-length output)
1474 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1475 # I=0, then 3; O=29, then 15 (with longer output)
1476 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1477 'a----------------------------.' +
1478 'b----------------------------.' +
1479 'cde--------------------------.' +
1480 'abcdefghijabcde.' +
1481 'a.b------------.' +
1482 '.c.------------.' +
1483 'd.e------------.' +
1484 'k--------------.' +
1485 'l--------------.' +
1486 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001487 ]
1488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001490 # Try a few one-shot test cases.
1491 for input, eof, output in self.test_cases:
1492 d = StatefulIncrementalDecoder()
1493 self.assertEquals(d.decode(input, eof), output)
1494
1495 # Also test an unfinished decode, followed by forcing EOF.
1496 d = StatefulIncrementalDecoder()
1497 self.assertEquals(d.decode(b'oiabcd'), '')
1498 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001499
1500class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001501
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001502 def setUp(self):
1503 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1504 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001505 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001506
Guido van Rossumd0712812007-04-11 16:32:43 +00001507 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001508 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001509
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510 def test_constructor(self):
1511 r = self.BytesIO(b"\xc3\xa9\n\n")
1512 b = self.BufferedReader(r, 1000)
1513 t = self.TextIOWrapper(b)
1514 t.__init__(b, encoding="latin1", newline="\r\n")
1515 self.assertEquals(t.encoding, "latin1")
1516 self.assertEquals(t.line_buffering, False)
1517 t.__init__(b, encoding="utf8", line_buffering=True)
1518 self.assertEquals(t.encoding, "utf8")
1519 self.assertEquals(t.line_buffering, True)
1520 self.assertEquals("\xe9\n", t.readline())
1521 self.assertRaises(TypeError, t.__init__, b, newline=42)
1522 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1523
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001524 def test_detach(self):
1525 r = self.BytesIO()
1526 b = self.BufferedWriter(r)
1527 t = self.TextIOWrapper(b)
1528 self.assertIs(t.detach(), b)
1529
1530 t = self.TextIOWrapper(b, encoding="ascii")
1531 t.write("howdy")
1532 self.assertFalse(r.getvalue())
1533 t.detach()
1534 self.assertEqual(r.getvalue(), b"howdy")
1535 self.assertRaises(ValueError, t.detach)
1536
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001537 def test_repr(self):
1538 raw = self.BytesIO("hello".encode("utf-8"))
1539 b = self.BufferedReader(raw)
1540 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001541 modname = self.TextIOWrapper.__module__
1542 self.assertEqual(repr(t),
1543 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1544 raw.name = "dummy"
1545 self.assertEqual(repr(t),
1546 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1547 raw.name = b"dummy"
1548 self.assertEqual(repr(t),
1549 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001550
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 def test_line_buffering(self):
1552 r = self.BytesIO()
1553 b = self.BufferedWriter(r, 1000)
1554 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001555 t.write("X")
1556 self.assertEquals(r.getvalue(), b"") # No flush happened
1557 t.write("Y\nZ")
1558 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1559 t.write("A\rB")
1560 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1561
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001562 def test_encoding(self):
1563 # Check the encoding attribute is always set, and valid
1564 b = self.BytesIO()
1565 t = self.TextIOWrapper(b, encoding="utf8")
1566 self.assertEqual(t.encoding, "utf8")
1567 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001568 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569 codecs.lookup(t.encoding)
1570
1571 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001572 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573 b = self.BytesIO(b"abc\n\xff\n")
1574 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001575 self.assertRaises(UnicodeError, t.read)
1576 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001577 b = self.BytesIO(b"abc\n\xff\n")
1578 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001579 self.assertRaises(UnicodeError, t.read)
1580 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001581 b = self.BytesIO(b"abc\n\xff\n")
1582 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001583 self.assertEquals(t.read(), "abc\n\n")
1584 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 b = self.BytesIO(b"abc\n\xff\n")
1586 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001587 self.assertEquals(t.read(), "abc\n\ufffd\n")
1588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001590 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 b = self.BytesIO()
1592 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001593 self.assertRaises(UnicodeError, t.write, "\xff")
1594 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 b = self.BytesIO()
1596 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001597 self.assertRaises(UnicodeError, t.write, "\xff")
1598 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001599 b = self.BytesIO()
1600 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001601 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001602 t.write("abc\xffdef\n")
1603 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001604 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001605 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001606 b = self.BytesIO()
1607 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001608 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001609 t.write("abc\xffdef\n")
1610 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001611 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001614 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1615
1616 tests = [
1617 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001618 [ '', input_lines ],
1619 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1620 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1621 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001622 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001623 encodings = (
1624 'utf-8', 'latin-1',
1625 'utf-16', 'utf-16-le', 'utf-16-be',
1626 'utf-32', 'utf-32-le', 'utf-32-be',
1627 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001628
Guido van Rossum8358db22007-08-18 21:39:55 +00001629 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001630 # character in TextIOWrapper._pending_line.
1631 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001632 # XXX: str.encode() should return bytes
1633 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001634 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001635 for bufsize in range(1, 10):
1636 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1638 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001639 encoding=encoding)
1640 if do_reads:
1641 got_lines = []
1642 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001643 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001644 if c2 == '':
1645 break
1646 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001647 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001648 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001649 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001650
1651 for got_line, exp_line in zip(got_lines, exp_lines):
1652 self.assertEquals(got_line, exp_line)
1653 self.assertEquals(len(got_lines), len(exp_lines))
1654
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 def test_newlines_input(self):
1656 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001657 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1658 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001659 (None, normalized.decode("ascii").splitlines(True)),
1660 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001661 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1662 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1663 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001664 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 buf = self.BytesIO(testdata)
1666 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001667 self.assertEquals(txt.readlines(), expected)
1668 txt.seek(0)
1669 self.assertEquals(txt.read(), "".join(expected))
1670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 def test_newlines_output(self):
1672 testdict = {
1673 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1674 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1675 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1676 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1677 }
1678 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1679 for newline, expected in tests:
1680 buf = self.BytesIO()
1681 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1682 txt.write("AAA\nB")
1683 txt.write("BB\nCCC\n")
1684 txt.write("X\rY\r\nZ")
1685 txt.flush()
1686 self.assertEquals(buf.closed, False)
1687 self.assertEquals(buf.getvalue(), expected)
1688
1689 def test_destructor(self):
1690 l = []
1691 base = self.BytesIO
1692 class MyBytesIO(base):
1693 def close(self):
1694 l.append(self.getvalue())
1695 base.close(self)
1696 b = MyBytesIO()
1697 t = self.TextIOWrapper(b, encoding="ascii")
1698 t.write("abc")
1699 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001700 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 self.assertEquals([b"abc"], l)
1702
1703 def test_override_destructor(self):
1704 record = []
1705 class MyTextIO(self.TextIOWrapper):
1706 def __del__(self):
1707 record.append(1)
1708 try:
1709 f = super().__del__
1710 except AttributeError:
1711 pass
1712 else:
1713 f()
1714 def close(self):
1715 record.append(2)
1716 super().close()
1717 def flush(self):
1718 record.append(3)
1719 super().flush()
1720 b = self.BytesIO()
1721 t = MyTextIO(b, encoding="ascii")
1722 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001723 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 self.assertEqual(record, [1, 2, 3])
1725
1726 def test_error_through_destructor(self):
1727 # Test that the exception state is not modified by a destructor,
1728 # even if close() fails.
1729 rawio = self.CloseFailureIO()
1730 def f():
1731 self.TextIOWrapper(rawio).xyzzy
1732 with support.captured_output("stderr") as s:
1733 self.assertRaises(AttributeError, f)
1734 s = s.getvalue().strip()
1735 if s:
1736 # The destructor *may* have printed an unraisable error, check it
1737 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001738 self.assertTrue(s.startswith("Exception IOError: "), s)
1739 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001740
Guido van Rossum9b76da62007-04-11 01:09:03 +00001741 # Systematic tests of the text I/O API
1742
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001744 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1745 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001747 f._CHUNK_SIZE = chunksize
1748 self.assertEquals(f.write("abc"), 3)
1749 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001750 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001751 f._CHUNK_SIZE = chunksize
1752 self.assertEquals(f.tell(), 0)
1753 self.assertEquals(f.read(), "abc")
1754 cookie = f.tell()
1755 self.assertEquals(f.seek(0), 0)
1756 self.assertEquals(f.read(2), "ab")
1757 self.assertEquals(f.read(1), "c")
1758 self.assertEquals(f.read(1), "")
1759 self.assertEquals(f.read(), "")
1760 self.assertEquals(f.tell(), cookie)
1761 self.assertEquals(f.seek(0), 0)
1762 self.assertEquals(f.seek(0, 2), cookie)
1763 self.assertEquals(f.write("def"), 3)
1764 self.assertEquals(f.seek(cookie), cookie)
1765 self.assertEquals(f.read(), "def")
1766 if enc.startswith("utf"):
1767 self.multi_line_test(f, enc)
1768 f.close()
1769
1770 def multi_line_test(self, f, enc):
1771 f.seek(0)
1772 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001773 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001774 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001775 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001776 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001777 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001778 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001779 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001780 wlines.append((f.tell(), line))
1781 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001782 f.seek(0)
1783 rlines = []
1784 while True:
1785 pos = f.tell()
1786 line = f.readline()
1787 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001788 break
1789 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001790 self.assertEquals(rlines, wlines)
1791
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001792 def test_telling(self):
1793 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001794 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001795 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001796 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001797 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001798 p2 = f.tell()
1799 f.seek(0)
1800 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001801 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001802 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001803 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001804 self.assertEquals(f.tell(), p2)
1805 f.seek(0)
1806 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001807 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001808 self.assertRaises(IOError, f.tell)
1809 self.assertEquals(f.tell(), p2)
1810 f.close()
1811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001812 def test_seeking(self):
1813 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001814 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001815 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001816 prefix = bytes(u_prefix.encode("utf-8"))
1817 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001818 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001819 suffix = bytes(u_suffix.encode("utf-8"))
1820 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001821 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001822 f.write(line*2)
1823 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001825 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001826 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001827 self.assertEquals(f.tell(), prefix_size)
1828 self.assertEquals(f.readline(), u_suffix)
1829
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001831 # Regression test for a specific bug
1832 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001833 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001834 f.write(data)
1835 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001837 f._CHUNK_SIZE # Just test that it exists
1838 f._CHUNK_SIZE = 2
1839 f.readline()
1840 f.tell()
1841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 def test_seek_and_tell(self):
1843 #Test seek/tell using the StatefulIncrementalDecoder.
1844 # Make test faster by doing smaller seeks
1845 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001846
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001847 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001848 """Tell/seek to various points within a data stream and ensure
1849 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001851 f.write(data)
1852 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001853 f = self.open(support.TESTFN, encoding='test_decoder')
1854 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001855 decoded = f.read()
1856 f.close()
1857
Neal Norwitze2b07052008-03-18 19:52:05 +00001858 for i in range(min_pos, len(decoded) + 1): # seek positions
1859 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001861 self.assertEquals(f.read(i), decoded[:i])
1862 cookie = f.tell()
1863 self.assertEquals(f.read(j), decoded[i:i + j])
1864 f.seek(cookie)
1865 self.assertEquals(f.read(), decoded[i:])
1866 f.close()
1867
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001868 # Enable the test decoder.
1869 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001870
1871 # Run the tests.
1872 try:
1873 # Try each test case.
1874 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001875 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001876
1877 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001878 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1879 offset = CHUNK_SIZE - len(input)//2
1880 prefix = b'.'*offset
1881 # Don't bother seeking into the prefix (takes too long).
1882 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001883 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001884
1885 # Ensure our test decoder won't interfere with subsequent tests.
1886 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001887 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001890 data = "1234567890"
1891 tests = ("utf-16",
1892 "utf-16-le",
1893 "utf-16-be",
1894 "utf-32",
1895 "utf-32-le",
1896 "utf-32-be")
1897 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 buf = self.BytesIO()
1899 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001900 # Check if the BOM is written only once (see issue1753).
1901 f.write(data)
1902 f.write(data)
1903 f.seek(0)
1904 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001905 f.seek(0)
1906 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001907 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1908
Benjamin Petersona1b49012009-03-31 23:11:32 +00001909 def test_unreadable(self):
1910 class UnReadable(self.BytesIO):
1911 def readable(self):
1912 return False
1913 txt = self.TextIOWrapper(UnReadable())
1914 self.assertRaises(IOError, txt.read)
1915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 def test_read_one_by_one(self):
1917 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001918 reads = ""
1919 while True:
1920 c = txt.read(1)
1921 if not c:
1922 break
1923 reads += c
1924 self.assertEquals(reads, "AA\nBB")
1925
1926 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001927 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001928 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001930 reads = ""
1931 while True:
1932 c = txt.read(128)
1933 if not c:
1934 break
1935 reads += c
1936 self.assertEquals(reads, "A"*127+"\nB")
1937
1938 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001939 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001940
1941 # read one char at a time
1942 reads = ""
1943 while True:
1944 c = txt.read(1)
1945 if not c:
1946 break
1947 reads += c
1948 self.assertEquals(reads, self.normalized)
1949
1950 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001952 txt._CHUNK_SIZE = 4
1953
1954 reads = ""
1955 while True:
1956 c = txt.read(4)
1957 if not c:
1958 break
1959 reads += c
1960 self.assertEquals(reads, self.normalized)
1961
1962 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001963 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001964 txt._CHUNK_SIZE = 4
1965
1966 reads = txt.read(4)
1967 reads += txt.read(4)
1968 reads += txt.readline()
1969 reads += txt.readline()
1970 reads += txt.readline()
1971 self.assertEquals(reads, self.normalized)
1972
1973 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001975 txt._CHUNK_SIZE = 4
1976
1977 reads = txt.read(4)
1978 reads += txt.read()
1979 self.assertEquals(reads, self.normalized)
1980
1981 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001983 txt._CHUNK_SIZE = 4
1984
1985 reads = txt.read(4)
1986 pos = txt.tell()
1987 txt.seek(0)
1988 txt.seek(pos)
1989 self.assertEquals(txt.read(4), "BBB\n")
1990
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001991 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001992 buffer = self.BytesIO(self.testdata)
1993 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001994
1995 self.assertEqual(buffer.seekable(), txt.seekable())
1996
Antoine Pitroue4501852009-05-14 18:55:55 +00001997 def test_append_bom(self):
1998 # The BOM is not written again when appending to a non-empty file
1999 filename = support.TESTFN
2000 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2001 with self.open(filename, 'w', encoding=charset) as f:
2002 f.write('aaa')
2003 pos = f.tell()
2004 with self.open(filename, 'rb') as f:
2005 self.assertEquals(f.read(), 'aaa'.encode(charset))
2006
2007 with self.open(filename, 'a', encoding=charset) as f:
2008 f.write('xxx')
2009 with self.open(filename, 'rb') as f:
2010 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2011
2012 def test_seek_bom(self):
2013 # Same test, but when seeking manually
2014 filename = support.TESTFN
2015 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2016 with self.open(filename, 'w', encoding=charset) as f:
2017 f.write('aaa')
2018 pos = f.tell()
2019 with self.open(filename, 'r+', encoding=charset) as f:
2020 f.seek(pos)
2021 f.write('zzz')
2022 f.seek(0)
2023 f.write('bbb')
2024 with self.open(filename, 'rb') as f:
2025 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2026
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002027 def test_errors_property(self):
2028 with self.open(support.TESTFN, "w") as f:
2029 self.assertEqual(f.errors, "strict")
2030 with self.open(support.TESTFN, "w", errors="replace") as f:
2031 self.assertEqual(f.errors, "replace")
2032
Antoine Pitroue4501852009-05-14 18:55:55 +00002033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002034class CTextIOWrapperTest(TextIOWrapperTest):
2035
2036 def test_initialization(self):
2037 r = self.BytesIO(b"\xc3\xa9\n\n")
2038 b = self.BufferedReader(r, 1000)
2039 t = self.TextIOWrapper(b)
2040 self.assertRaises(TypeError, t.__init__, b, newline=42)
2041 self.assertRaises(ValueError, t.read)
2042 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2043 self.assertRaises(ValueError, t.read)
2044
2045 def test_garbage_collection(self):
2046 # C TextIOWrapper objects are collected, and collecting them flushes
2047 # all data to disk.
2048 # The Python version has __del__, so it ends in gc.garbage instead.
2049 rawio = io.FileIO(support.TESTFN, "wb")
2050 b = self.BufferedWriter(rawio)
2051 t = self.TextIOWrapper(b, encoding="ascii")
2052 t.write("456def")
2053 t.x = t
2054 wr = weakref.ref(t)
2055 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002056 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002057 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002058 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002059 self.assertEqual(f.read(), b"456def")
2060
2061class PyTextIOWrapperTest(TextIOWrapperTest):
2062 pass
2063
2064
2065class IncrementalNewlineDecoderTest(unittest.TestCase):
2066
2067 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002068 # UTF-8 specific tests for a newline decoder
2069 def _check_decode(b, s, **kwargs):
2070 # We exercise getstate() / setstate() as well as decode()
2071 state = decoder.getstate()
2072 self.assertEquals(decoder.decode(b, **kwargs), s)
2073 decoder.setstate(state)
2074 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002075
Antoine Pitrou180a3362008-12-14 16:36:46 +00002076 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002077
Antoine Pitrou180a3362008-12-14 16:36:46 +00002078 _check_decode(b'\xe8', "")
2079 _check_decode(b'\xa2', "")
2080 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002081
Antoine Pitrou180a3362008-12-14 16:36:46 +00002082 _check_decode(b'\xe8', "")
2083 _check_decode(b'\xa2', "")
2084 _check_decode(b'\x88', "\u8888")
2085
2086 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002087 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2088
Antoine Pitrou180a3362008-12-14 16:36:46 +00002089 decoder.reset()
2090 _check_decode(b'\n', "\n")
2091 _check_decode(b'\r', "")
2092 _check_decode(b'', "\n", final=True)
2093 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002094
Antoine Pitrou180a3362008-12-14 16:36:46 +00002095 _check_decode(b'\r', "")
2096 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002097
Antoine Pitrou180a3362008-12-14 16:36:46 +00002098 _check_decode(b'\r\r\n', "\n\n")
2099 _check_decode(b'\r', "")
2100 _check_decode(b'\r', "\n")
2101 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002102
Antoine Pitrou180a3362008-12-14 16:36:46 +00002103 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2104 _check_decode(b'\xe8\xa2\x88', "\u8888")
2105 _check_decode(b'\n', "\n")
2106 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2107 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002108
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002109 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002110 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 if encoding is not None:
2112 encoder = codecs.getincrementalencoder(encoding)()
2113 def _decode_bytewise(s):
2114 # Decode one byte at a time
2115 for b in encoder.encode(s):
2116 result.append(decoder.decode(bytes([b])))
2117 else:
2118 encoder = None
2119 def _decode_bytewise(s):
2120 # Decode one char at a time
2121 for c in s:
2122 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002123 self.assertEquals(decoder.newlines, None)
2124 _decode_bytewise("abc\n\r")
2125 self.assertEquals(decoder.newlines, '\n')
2126 _decode_bytewise("\nabc")
2127 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2128 _decode_bytewise("abc\r")
2129 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2130 _decode_bytewise("abc")
2131 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2132 _decode_bytewise("abc\r")
2133 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2134 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135 input = "abc"
2136 if encoder is not None:
2137 encoder.reset()
2138 input = encoder.encode(input)
2139 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002140 self.assertEquals(decoder.newlines, None)
2141
2142 def test_newline_decoder(self):
2143 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 # None meaning the IncrementalNewlineDecoder takes unicode input
2145 # rather than bytes input
2146 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002147 'utf-16', 'utf-16-le', 'utf-16-be',
2148 'utf-32', 'utf-32-le', 'utf-32-be',
2149 )
2150 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 decoder = enc and codecs.getincrementaldecoder(enc)()
2152 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2153 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002154 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2156 self.check_newline_decoding_utf8(decoder)
2157
Antoine Pitrou66913e22009-03-06 23:40:56 +00002158 def test_newline_bytes(self):
2159 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2160 def _check(dec):
2161 self.assertEquals(dec.newlines, None)
2162 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2163 self.assertEquals(dec.newlines, None)
2164 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2165 self.assertEquals(dec.newlines, None)
2166 dec = self.IncrementalNewlineDecoder(None, translate=False)
2167 _check(dec)
2168 dec = self.IncrementalNewlineDecoder(None, translate=True)
2169 _check(dec)
2170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2172 pass
2173
2174class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2175 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002176
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002177
Guido van Rossum01a27522007-03-07 01:00:12 +00002178# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002179
Guido van Rossum5abbf752007-08-27 17:39:33 +00002180class MiscIOTest(unittest.TestCase):
2181
Barry Warsaw40e82462008-11-20 20:14:50 +00002182 def tearDown(self):
2183 support.unlink(support.TESTFN)
2184
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 def test___all__(self):
2186 for name in self.io.__all__:
2187 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002188 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002189 if name == "open":
2190 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002191 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002192 self.assertTrue(issubclass(obj, Exception), name)
2193 elif not name.startswith("SEEK_"):
2194 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002195
Barry Warsaw40e82462008-11-20 20:14:50 +00002196 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002198 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002199 f.close()
2200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002202 self.assertEquals(f.name, support.TESTFN)
2203 self.assertEquals(f.buffer.name, support.TESTFN)
2204 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2205 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002206 self.assertEquals(f.buffer.mode, "rb")
2207 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002208 f.close()
2209
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002211 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002212 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2213 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002214
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002215 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002216 self.assertEquals(g.mode, "wb")
2217 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002218 self.assertEquals(g.name, f.fileno())
2219 self.assertEquals(g.raw.name, f.fileno())
2220 f.close()
2221 g.close()
2222
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002223 def test_io_after_close(self):
2224 for kwargs in [
2225 {"mode": "w"},
2226 {"mode": "wb"},
2227 {"mode": "w", "buffering": 1},
2228 {"mode": "w", "buffering": 2},
2229 {"mode": "wb", "buffering": 0},
2230 {"mode": "r"},
2231 {"mode": "rb"},
2232 {"mode": "r", "buffering": 1},
2233 {"mode": "r", "buffering": 2},
2234 {"mode": "rb", "buffering": 0},
2235 {"mode": "w+"},
2236 {"mode": "w+b"},
2237 {"mode": "w+", "buffering": 1},
2238 {"mode": "w+", "buffering": 2},
2239 {"mode": "w+b", "buffering": 0},
2240 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002241 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002242 f.close()
2243 self.assertRaises(ValueError, f.flush)
2244 self.assertRaises(ValueError, f.fileno)
2245 self.assertRaises(ValueError, f.isatty)
2246 self.assertRaises(ValueError, f.__iter__)
2247 if hasattr(f, "peek"):
2248 self.assertRaises(ValueError, f.peek, 1)
2249 self.assertRaises(ValueError, f.read)
2250 if hasattr(f, "read1"):
2251 self.assertRaises(ValueError, f.read1, 1024)
2252 if hasattr(f, "readinto"):
2253 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2254 self.assertRaises(ValueError, f.readline)
2255 self.assertRaises(ValueError, f.readlines)
2256 self.assertRaises(ValueError, f.seek, 0)
2257 self.assertRaises(ValueError, f.tell)
2258 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002259 self.assertRaises(ValueError, f.write,
2260 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002261 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002262 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002263
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002264 def test_blockingioerror(self):
2265 # Various BlockingIOError issues
2266 self.assertRaises(TypeError, self.BlockingIOError)
2267 self.assertRaises(TypeError, self.BlockingIOError, 1)
2268 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2269 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2270 b = self.BlockingIOError(1, "")
2271 self.assertEqual(b.characters_written, 0)
2272 class C(str):
2273 pass
2274 c = C("")
2275 b = self.BlockingIOError(1, c)
2276 c.b = b
2277 b.c = c
2278 wr = weakref.ref(c)
2279 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002280 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002281 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282
2283 def test_abcs(self):
2284 # Test the visible base classes are ABCs.
2285 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2286 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2287 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2288 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2289
2290 def _check_abc_inheritance(self, abcmodule):
2291 with self.open(support.TESTFN, "wb", buffering=0) as f:
2292 self.assertTrue(isinstance(f, abcmodule.IOBase))
2293 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2294 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2295 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2296 with self.open(support.TESTFN, "wb") as f:
2297 self.assertTrue(isinstance(f, abcmodule.IOBase))
2298 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2299 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2300 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2301 with self.open(support.TESTFN, "w") as f:
2302 self.assertTrue(isinstance(f, abcmodule.IOBase))
2303 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2304 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2305 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2306
2307 def test_abc_inheritance(self):
2308 # Test implementations inherit from their respective ABCs
2309 self._check_abc_inheritance(self)
2310
2311 def test_abc_inheritance_official(self):
2312 # Test implementations inherit from the official ABCs of the
2313 # baseline "io" module.
2314 self._check_abc_inheritance(io)
2315
2316class CMiscIOTest(MiscIOTest):
2317 io = io
2318
2319class PyMiscIOTest(MiscIOTest):
2320 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002321
Guido van Rossum28524c72007-02-27 05:47:44 +00002322def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002323 tests = (CIOTest, PyIOTest,
2324 CBufferedReaderTest, PyBufferedReaderTest,
2325 CBufferedWriterTest, PyBufferedWriterTest,
2326 CBufferedRWPairTest, PyBufferedRWPairTest,
2327 CBufferedRandomTest, PyBufferedRandomTest,
2328 StatefulIncrementalDecoderTest,
2329 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2330 CTextIOWrapperTest, PyTextIOWrapperTest,
2331 CMiscIOTest, PyMiscIOTest,)
2332
2333 # Put the namespaces of the IO module we are testing and some useful mock
2334 # classes in the __dict__ of each test.
2335 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2336 MockNonBlockWriterIO)
2337 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2338 c_io_ns = {name : getattr(io, name) for name in all_members}
2339 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2340 globs = globals()
2341 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2342 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2343 # Avoid turning open into a bound method.
2344 py_io_ns["open"] = pyio.OpenWrapper
2345 for test in tests:
2346 if test.__name__.startswith("C"):
2347 for name, obj in c_io_ns.items():
2348 setattr(test, name, obj)
2349 elif test.__name__.startswith("Py"):
2350 for name, obj in py_io_ns.items():
2351 setattr(test, name, obj)
2352
2353 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002354
2355if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002356 test_main()