blob: 08e0f1396f4edacdb4360ccc40249e9ef1965607 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000293 def test_invalid_operations(self):
294 # Try writing on a file opened in read mode and vice-versa.
295 for mode in ("w", "wb"):
296 with open(support.TESTFN, mode) as fp:
297 self.assertRaises(IOError, fp.read)
298 self.assertRaises(IOError, fp.readline)
299 with open(support.TESTFN, "rb") as fp:
300 self.assertRaises(IOError, fp.write, b"blah")
301 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
302 with open(support.TESTFN, "r") as fp:
303 self.assertRaises(IOError, fp.write, "blah")
304 self.assertRaises(IOError, fp.writelines, ["blah\n"])
305
Guido van Rossum28524c72007-02-27 05:47:44 +0000306 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000307 with self.open(support.TESTFN, "wb", buffering=0) as f:
308 self.assertEqual(f.readable(), False)
309 self.assertEqual(f.writable(), True)
310 self.assertEqual(f.seekable(), True)
311 self.write_ops(f)
312 with self.open(support.TESTFN, "rb", buffering=0) as f:
313 self.assertEqual(f.readable(), True)
314 self.assertEqual(f.writable(), False)
315 self.assertEqual(f.seekable(), True)
316 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000317
Guido van Rossum87429772007-04-10 21:06:59 +0000318 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000319 with self.open(support.TESTFN, "wb") as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb") as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000329
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000331 with self.open(support.TESTFN, "wb") as f:
332 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
333 with self.open(support.TESTFN, "rb") as f:
334 self.assertEqual(f.readline(), b"abc\n")
335 self.assertEqual(f.readline(10), b"def\n")
336 self.assertEqual(f.readline(2), b"xy")
337 self.assertEqual(f.readline(4), b"zzy\n")
338 self.assertEqual(f.readline(), b"foo\x00bar\n")
339 self.assertEqual(f.readline(), b"another line")
340 self.assertRaises(TypeError, f.readline, 5.3)
341 with self.open(support.TESTFN, "r") as f:
342 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000343
Guido van Rossum28524c72007-02-27 05:47:44 +0000344 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000345 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000346 self.write_ops(f)
347 data = f.getvalue()
348 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000349 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum53807da2007-04-10 19:01:47 +0000352 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 # On Windows and Mac OSX this test comsumes large resources; It takes
354 # a long time to build the >2GB file and takes >2GB of disk space
355 # therefore the resource must be enabled to run this test.
356 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000357 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000358 print("\nTesting large file ops skipped on %s." % sys.platform,
359 file=sys.stderr)
360 print("It requires %d bytes and a long time." % self.LARGE,
361 file=sys.stderr)
362 print("Use 'regrtest.py -u largefile test_io' to run it.",
363 file=sys.stderr)
364 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000365 with self.open(support.TESTFN, "w+b", 0) as f:
366 self.large_file_ops(f)
367 with self.open(support.TESTFN, "w+b") as f:
368 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000369
370 def test_with_open(self):
371 for bufsize in (0, 1, 100):
372 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000373 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000374 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.closed, True)
376 f = None
377 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000378 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000379 1/0
380 except ZeroDivisionError:
381 self.assertEqual(f.closed, True)
382 else:
383 self.fail("1/0 didn't raise an exception")
384
Antoine Pitrou08838b62009-01-21 00:55:13 +0000385 # issue 5008
386 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000388 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000389 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000390 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000394 self.assert_(f.tell() > 0)
395
Guido van Rossum87429772007-04-10 21:06:59 +0000396 def test_destructor(self):
397 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000399 def __del__(self):
400 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 try:
402 f = super().__del__
403 except AttributeError:
404 pass
405 else:
406 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000407 def close(self):
408 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000410 def flush(self):
411 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 super().flush()
413 f = MyFileIO(support.TESTFN, "wb")
414 f.write(b"xxx")
415 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000416 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson45cec322009-04-24 23:14:50 +0000418 with open(support.TESTFN, "rb") as f:
419 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420
421 def _check_base_destructor(self, base):
422 record = []
423 class MyIO(base):
424 def __init__(self):
425 # This exercises the availability of attributes on object
426 # destruction.
427 # (in the C version, close() is called by the tp_dealloc
428 # function, not by __del__)
429 self.on_del = 1
430 self.on_close = 2
431 self.on_flush = 3
432 def __del__(self):
433 record.append(self.on_del)
434 try:
435 f = super().__del__
436 except AttributeError:
437 pass
438 else:
439 f()
440 def close(self):
441 record.append(self.on_close)
442 super().close()
443 def flush(self):
444 record.append(self.on_flush)
445 super().flush()
446 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000447 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000448 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000449 self.assertEqual(record, [1, 2, 3])
450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 def test_IOBase_destructor(self):
452 self._check_base_destructor(self.IOBase)
453
454 def test_RawIOBase_destructor(self):
455 self._check_base_destructor(self.RawIOBase)
456
457 def test_BufferedIOBase_destructor(self):
458 self._check_base_destructor(self.BufferedIOBase)
459
460 def test_TextIOBase_destructor(self):
461 self._check_base_destructor(self.TextIOBase)
462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000464 with self.open(support.TESTFN, "wb") as f:
465 f.write(b"xxx")
466 with self.open(support.TESTFN, "rb") as f:
467 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000468
Guido van Rossumd4103952007-04-12 05:44:49 +0000469 def test_array_writes(self):
470 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000471 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000472 with self.open(support.TESTFN, "wb", 0) as f:
473 self.assertEqual(f.write(a), n)
474 with self.open(support.TESTFN, "wb") as f:
475 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000476
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000477 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000479 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481 def test_read_closed(self):
482 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000483 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 with self.open(support.TESTFN, "r") as f:
485 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(file.read(), "egg\n")
487 file.seek(0)
488 file.close()
489 self.assertRaises(ValueError, file.read)
490
491 def test_no_closefd_with_filename(self):
492 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000494
495 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000496 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000497 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000499 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000501 self.assertEqual(file.buffer.raw.closefd, False)
502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_garbage_collection(self):
504 # FileIO objects are collected, and collecting them flushes
505 # all data to disk.
506 f = self.FileIO(support.TESTFN, "wb")
507 f.write(b"abcxxx")
508 f.f = f
509 wr = weakref.ref(f)
510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000512 self.assert_(wr() is None, wr)
513 with open(support.TESTFN, "rb") as f:
514 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000515
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000516 def test_unbounded_file(self):
517 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
518 zero = "/dev/zero"
519 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000520 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000521 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000522 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000523 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000524 self.skipTest("test requires at least 2GB of memory")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000525 with open(zero, "rb", buffering=0) as f:
526 self.assertRaises(OverflowError, f.read)
527 with open(zero, "rb") as f:
528 self.assertRaises(OverflowError, f.read)
529 with open(zero, "r") as f:
530 self.assertRaises(OverflowError, f.read)
531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532class CIOTest(IOTest):
533 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535class PyIOTest(IOTest):
536 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000537
Guido van Rossuma9e20242007-03-08 00:43:48 +0000538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539class CommonBufferedTests:
540 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
541
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000542 def test_detach(self):
543 raw = self.MockRawIO()
544 buf = self.tp(raw)
545 self.assertIs(buf.detach(), raw)
546 self.assertRaises(ValueError, buf.detach)
547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_fileno(self):
549 rawio = self.MockRawIO()
550 bufio = self.tp(rawio)
551
552 self.assertEquals(42, bufio.fileno())
553
554 def test_no_fileno(self):
555 # XXX will we always have fileno() function? If so, kill
556 # this test. Else, write it.
557 pass
558
559 def test_invalid_args(self):
560 rawio = self.MockRawIO()
561 bufio = self.tp(rawio)
562 # Invalid whence
563 self.assertRaises(ValueError, bufio.seek, 0, -1)
564 self.assertRaises(ValueError, bufio.seek, 0, 3)
565
566 def test_override_destructor(self):
567 tp = self.tp
568 record = []
569 class MyBufferedIO(tp):
570 def __del__(self):
571 record.append(1)
572 try:
573 f = super().__del__
574 except AttributeError:
575 pass
576 else:
577 f()
578 def close(self):
579 record.append(2)
580 super().close()
581 def flush(self):
582 record.append(3)
583 super().flush()
584 rawio = self.MockRawIO()
585 bufio = MyBufferedIO(rawio)
586 writable = bufio.writable()
587 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000588 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 if writable:
590 self.assertEqual(record, [1, 2, 3])
591 else:
592 self.assertEqual(record, [1, 2])
593
594 def test_context_manager(self):
595 # Test usability as a context manager
596 rawio = self.MockRawIO()
597 bufio = self.tp(rawio)
598 def _with():
599 with bufio:
600 pass
601 _with()
602 # bufio should now be closed, and using it a second time should raise
603 # a ValueError.
604 self.assertRaises(ValueError, _with)
605
606 def test_error_through_destructor(self):
607 # Test that the exception state is not modified by a destructor,
608 # even if close() fails.
609 rawio = self.CloseFailureIO()
610 def f():
611 self.tp(rawio).xyzzy
612 with support.captured_output("stderr") as s:
613 self.assertRaises(AttributeError, f)
614 s = s.getvalue().strip()
615 if s:
616 # The destructor *may* have printed an unraisable error, check it
617 self.assertEqual(len(s.splitlines()), 1)
618 self.assert_(s.startswith("Exception IOError: "), s)
619 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000620
621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
623 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 def test_constructor(self):
626 rawio = self.MockRawIO([b"abc"])
627 bufio = self.tp(rawio)
628 bufio.__init__(rawio)
629 bufio.__init__(rawio, buffer_size=1024)
630 bufio.__init__(rawio, buffer_size=16)
631 self.assertEquals(b"abc", bufio.read())
632 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
633 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
634 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
635 rawio = self.MockRawIO([b"abc"])
636 bufio.__init__(rawio)
637 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000638
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000639 def test_read(self):
640 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
641 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000642 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 # Invalid args
644 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000645
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 def test_read1(self):
647 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
648 bufio = self.tp(rawio)
649 self.assertEquals(b"a", bufio.read(1))
650 self.assertEquals(b"b", bufio.read1(1))
651 self.assertEquals(rawio._reads, 1)
652 self.assertEquals(b"c", bufio.read1(100))
653 self.assertEquals(rawio._reads, 1)
654 self.assertEquals(b"d", bufio.read1(100))
655 self.assertEquals(rawio._reads, 2)
656 self.assertEquals(b"efg", bufio.read1(100))
657 self.assertEquals(rawio._reads, 3)
658 self.assertEquals(b"", bufio.read1(100))
659 # Invalid args
660 self.assertRaises(ValueError, bufio.read1, -1)
661
662 def test_readinto(self):
663 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
664 bufio = self.tp(rawio)
665 b = bytearray(2)
666 self.assertEquals(bufio.readinto(b), 2)
667 self.assertEquals(b, b"ab")
668 self.assertEquals(bufio.readinto(b), 2)
669 self.assertEquals(b, b"cd")
670 self.assertEquals(bufio.readinto(b), 2)
671 self.assertEquals(b, b"ef")
672 self.assertEquals(bufio.readinto(b), 1)
673 self.assertEquals(b, b"gf")
674 self.assertEquals(bufio.readinto(b), 0)
675 self.assertEquals(b, b"gf")
676
677 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000678 data = b"abcdefghi"
679 dlen = len(data)
680
681 tests = [
682 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
683 [ 100, [ 3, 3, 3], [ dlen ] ],
684 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
685 ]
686
687 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000688 rawio = self.MockFileIO(data)
689 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000690 pos = 0
691 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000692 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000693 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000695 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000698 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000699 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
700 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000701
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000702 self.assertEquals(b"abcd", bufio.read(6))
703 self.assertEquals(b"e", bufio.read(1))
704 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000705 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000706 self.assert_(None is bufio.read())
707 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 def test_read_past_eof(self):
710 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
711 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000712
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000713 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 def test_read_all(self):
716 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
717 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000718
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000719 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000721 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000722 try:
723 # Write out many bytes with exactly the same number of 0's,
724 # 1's... 255's. This will help us check that concurrent reading
725 # doesn't duplicate or forget contents.
726 N = 1000
727 l = list(range(256)) * N
728 random.shuffle(l)
729 s = bytes(bytearray(l))
730 with io.open(support.TESTFN, "wb") as f:
731 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
733 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000734 errors = []
735 results = []
736 def f():
737 try:
738 # Intra-buffer read then buffer-flushing read
739 for n in cycle([1, 19]):
740 s = bufio.read(n)
741 if not s:
742 break
743 # list.append() is atomic
744 results.append(s)
745 except Exception as e:
746 errors.append(e)
747 raise
748 threads = [threading.Thread(target=f) for x in range(20)]
749 for t in threads:
750 t.start()
751 time.sleep(0.02) # yield
752 for t in threads:
753 t.join()
754 self.assertFalse(errors,
755 "the following exceptions were caught: %r" % errors)
756 s = b''.join(results)
757 for i in range(256):
758 c = bytes(bytearray([i]))
759 self.assertEqual(s.count(c), N)
760 finally:
761 support.unlink(support.TESTFN)
762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 def test_misbehaved_io(self):
764 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
765 bufio = self.tp(rawio)
766 self.assertRaises(IOError, bufio.seek, 0)
767 self.assertRaises(IOError, bufio.tell)
768
769class CBufferedReaderTest(BufferedReaderTest):
770 tp = io.BufferedReader
771
772 def test_constructor(self):
773 BufferedReaderTest.test_constructor(self)
774 # The allocation can succeed on 32-bit builds, e.g. with more
775 # than 2GB RAM and a 64-bit kernel.
776 if sys.maxsize > 0x7FFFFFFF:
777 rawio = self.MockRawIO()
778 bufio = self.tp(rawio)
779 self.assertRaises((OverflowError, MemoryError, ValueError),
780 bufio.__init__, rawio, sys.maxsize)
781
782 def test_initialization(self):
783 rawio = self.MockRawIO([b"abc"])
784 bufio = self.tp(rawio)
785 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
786 self.assertRaises(ValueError, bufio.read)
787 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
788 self.assertRaises(ValueError, bufio.read)
789 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
790 self.assertRaises(ValueError, bufio.read)
791
792 def test_misbehaved_io_read(self):
793 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
794 bufio = self.tp(rawio)
795 # _pyio.BufferedReader seems to implement reading different, so that
796 # checking this is not so easy.
797 self.assertRaises(IOError, bufio.read, 10)
798
799 def test_garbage_collection(self):
800 # C BufferedReader objects are collected.
801 # The Python version has __del__, so it ends into gc.garbage instead
802 rawio = self.FileIO(support.TESTFN, "w+b")
803 f = self.tp(rawio)
804 f.f = f
805 wr = weakref.ref(f)
806 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000807 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000808 self.assert_(wr() is None, wr)
809
810class PyBufferedReaderTest(BufferedReaderTest):
811 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000812
Guido van Rossuma9e20242007-03-08 00:43:48 +0000813
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000814class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
815 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000816
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000817 def test_constructor(self):
818 rawio = self.MockRawIO()
819 bufio = self.tp(rawio)
820 bufio.__init__(rawio)
821 bufio.__init__(rawio, buffer_size=1024)
822 bufio.__init__(rawio, buffer_size=16)
823 self.assertEquals(3, bufio.write(b"abc"))
824 bufio.flush()
825 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
826 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
827 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
828 bufio.__init__(rawio)
829 self.assertEquals(3, bufio.write(b"ghi"))
830 bufio.flush()
831 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
832
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000833 def test_detach_flush(self):
834 raw = self.MockRawIO()
835 buf = self.tp(raw)
836 buf.write(b"howdy!")
837 self.assertFalse(raw._write_stack)
838 buf.detach()
839 self.assertEqual(raw._write_stack, [b"howdy!"])
840
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000841 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000842 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 writer = self.MockRawIO()
844 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000845 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000846 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000848 def test_write_overflow(self):
849 writer = self.MockRawIO()
850 bufio = self.tp(writer, 8)
851 contents = b"abcdefghijklmnop"
852 for n in range(0, len(contents), 3):
853 bufio.write(contents[n:n+3])
854 flushed = b"".join(writer._write_stack)
855 # At least (total - 8) bytes were implicitly flushed, perhaps more
856 # depending on the implementation.
857 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def check_writes(self, intermediate_func):
860 # Lots of writes, test the flushed output is as expected.
861 contents = bytes(range(256)) * 1000
862 n = 0
863 writer = self.MockRawIO()
864 bufio = self.tp(writer, 13)
865 # Generator of write sizes: repeat each N 15 times then proceed to N+1
866 def gen_sizes():
867 for size in count(1):
868 for i in range(15):
869 yield size
870 sizes = gen_sizes()
871 while n < len(contents):
872 size = min(next(sizes), len(contents) - n)
873 self.assertEquals(bufio.write(contents[n:n+size]), size)
874 intermediate_func(bufio)
875 n += size
876 bufio.flush()
877 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 def test_writes(self):
880 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000882 def test_writes_and_flushes(self):
883 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 def test_writes_and_seeks(self):
886 def _seekabs(bufio):
887 pos = bufio.tell()
888 bufio.seek(pos + 1, 0)
889 bufio.seek(pos - 1, 0)
890 bufio.seek(pos, 0)
891 self.check_writes(_seekabs)
892 def _seekrel(bufio):
893 pos = bufio.seek(0, 1)
894 bufio.seek(+1, 1)
895 bufio.seek(-1, 1)
896 bufio.seek(pos, 0)
897 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000899 def test_writes_and_truncates(self):
900 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000902 def test_write_non_blocking(self):
903 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000904 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 self.assertEquals(bufio.write(b"abcd"), 4)
907 self.assertEquals(bufio.write(b"efghi"), 5)
908 # 1 byte will be written, the rest will be buffered
909 raw.block_on(b"k")
910 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912 # 8 bytes will be written, 8 will be buffered and the rest will be lost
913 raw.block_on(b"0")
914 try:
915 bufio.write(b"opqrwxyz0123456789")
916 except self.BlockingIOError as e:
917 written = e.characters_written
918 else:
919 self.fail("BlockingIOError should have been raised")
920 self.assertEquals(written, 16)
921 self.assertEquals(raw.pop_written(),
922 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000923
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000924 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
925 s = raw.pop_written()
926 # Previously buffered bytes were flushed
927 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000928
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 def test_write_and_rewind(self):
930 raw = io.BytesIO()
931 bufio = self.tp(raw, 4)
932 self.assertEqual(bufio.write(b"abcdef"), 6)
933 self.assertEqual(bufio.tell(), 6)
934 bufio.seek(0, 0)
935 self.assertEqual(bufio.write(b"XY"), 2)
936 bufio.seek(6, 0)
937 self.assertEqual(raw.getvalue(), b"XYcdef")
938 self.assertEqual(bufio.write(b"123456"), 6)
939 bufio.flush()
940 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000941
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000942 def test_flush(self):
943 writer = self.MockRawIO()
944 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000945 bufio.write(b"abc")
946 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000947 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000948
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000949 def test_destructor(self):
950 writer = self.MockRawIO()
951 bufio = self.tp(writer, 8)
952 bufio.write(b"abc")
953 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000954 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000955 self.assertEquals(b"abc", writer._write_stack[0])
956
957 def test_truncate(self):
958 # Truncate implicitly flushes the buffer.
959 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
960 bufio = self.tp(raw, 8)
961 bufio.write(b"abcdef")
962 self.assertEqual(bufio.truncate(3), 3)
963 self.assertEqual(bufio.tell(), 3)
964 with io.open(support.TESTFN, "rb", buffering=0) as f:
965 self.assertEqual(f.read(), b"abc")
966
967 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000968 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000969 # Write out many bytes from many threads and test they were
970 # all flushed.
971 N = 1000
972 contents = bytes(range(256)) * N
973 sizes = cycle([1, 19])
974 n = 0
975 queue = deque()
976 while n < len(contents):
977 size = next(sizes)
978 queue.append(contents[n:n+size])
979 n += size
980 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000981 # We use a real file object because it allows us to
982 # exercise situations where the GIL is released before
983 # writing the buffer to the raw streams. This is in addition
984 # to concurrency issues due to switching threads in the middle
985 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000986 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
987 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000988 errors = []
989 def f():
990 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000991 while True:
992 try:
993 s = queue.popleft()
994 except IndexError:
995 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000996 bufio.write(s)
997 except Exception as e:
998 errors.append(e)
999 raise
1000 threads = [threading.Thread(target=f) for x in range(20)]
1001 for t in threads:
1002 t.start()
1003 time.sleep(0.02) # yield
1004 for t in threads:
1005 t.join()
1006 self.assertFalse(errors,
1007 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001008 bufio.close()
1009 with io.open(support.TESTFN, "rb") as f:
1010 s = f.read()
1011 for i in range(256):
1012 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001013 finally:
1014 support.unlink(support.TESTFN)
1015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016 def test_misbehaved_io(self):
1017 rawio = self.MisbehavedRawIO()
1018 bufio = self.tp(rawio, 5)
1019 self.assertRaises(IOError, bufio.seek, 0)
1020 self.assertRaises(IOError, bufio.tell)
1021 self.assertRaises(IOError, bufio.write, b"abcdef")
1022
Benjamin Peterson59406a92009-03-26 17:10:29 +00001023 def test_max_buffer_size_deprecation(self):
1024 with support.check_warnings() as w:
1025 warnings.simplefilter("always", DeprecationWarning)
1026 self.tp(self.MockRawIO(), 8, 12)
1027 self.assertEqual(len(w.warnings), 1)
1028 warning = w.warnings[0]
1029 self.assertTrue(warning.category is DeprecationWarning)
1030 self.assertEqual(str(warning.message),
1031 "max_buffer_size is deprecated")
1032
1033
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001034class CBufferedWriterTest(BufferedWriterTest):
1035 tp = io.BufferedWriter
1036
1037 def test_constructor(self):
1038 BufferedWriterTest.test_constructor(self)
1039 # The allocation can succeed on 32-bit builds, e.g. with more
1040 # than 2GB RAM and a 64-bit kernel.
1041 if sys.maxsize > 0x7FFFFFFF:
1042 rawio = self.MockRawIO()
1043 bufio = self.tp(rawio)
1044 self.assertRaises((OverflowError, MemoryError, ValueError),
1045 bufio.__init__, rawio, sys.maxsize)
1046
1047 def test_initialization(self):
1048 rawio = self.MockRawIO()
1049 bufio = self.tp(rawio)
1050 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1051 self.assertRaises(ValueError, bufio.write, b"def")
1052 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1053 self.assertRaises(ValueError, bufio.write, b"def")
1054 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1055 self.assertRaises(ValueError, bufio.write, b"def")
1056
1057 def test_garbage_collection(self):
1058 # C BufferedWriter objects are collected, and collecting them flushes
1059 # all data to disk.
1060 # The Python version has __del__, so it ends into gc.garbage instead
1061 rawio = self.FileIO(support.TESTFN, "w+b")
1062 f = self.tp(rawio)
1063 f.write(b"123xxx")
1064 f.x = f
1065 wr = weakref.ref(f)
1066 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001067 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068 self.assert_(wr() is None, wr)
1069 with open(support.TESTFN, "rb") as f:
1070 self.assertEqual(f.read(), b"123xxx")
1071
1072
1073class PyBufferedWriterTest(BufferedWriterTest):
1074 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001075
Guido van Rossum01a27522007-03-07 01:00:12 +00001076class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001077
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001078 def test_constructor(self):
1079 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001080 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001081
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001082 def test_detach(self):
1083 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1084 self.assertRaises(self.UnsupportedOperation, pair.detach)
1085
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001086 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001087 with support.check_warnings() as w:
1088 warnings.simplefilter("always", DeprecationWarning)
1089 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1090 self.assertEqual(len(w.warnings), 1)
1091 warning = w.warnings[0]
1092 self.assertTrue(warning.category is DeprecationWarning)
1093 self.assertEqual(str(warning.message),
1094 "max_buffer_size is deprecated")
1095
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001096 def test_constructor_with_not_readable(self):
1097 class NotReadable(MockRawIO):
1098 def readable(self):
1099 return False
1100
1101 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1102
1103 def test_constructor_with_not_writeable(self):
1104 class NotWriteable(MockRawIO):
1105 def writable(self):
1106 return False
1107
1108 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1109
1110 def test_read(self):
1111 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1112
1113 self.assertEqual(pair.read(3), b"abc")
1114 self.assertEqual(pair.read(1), b"d")
1115 self.assertEqual(pair.read(), b"ef")
1116
1117 def test_read1(self):
1118 # .read1() is delegated to the underlying reader object, so this test
1119 # can be shallow.
1120 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1121
1122 self.assertEqual(pair.read1(3), b"abc")
1123
1124 def test_readinto(self):
1125 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1126
1127 data = bytearray(5)
1128 self.assertEqual(pair.readinto(data), 5)
1129 self.assertEqual(data, b"abcde")
1130
1131 def test_write(self):
1132 w = self.MockRawIO()
1133 pair = self.tp(self.MockRawIO(), w)
1134
1135 pair.write(b"abc")
1136 pair.flush()
1137 pair.write(b"def")
1138 pair.flush()
1139 self.assertEqual(w._write_stack, [b"abc", b"def"])
1140
1141 def test_peek(self):
1142 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1143
1144 self.assertTrue(pair.peek(3).startswith(b"abc"))
1145 self.assertEqual(pair.read(3), b"abc")
1146
1147 def test_readable(self):
1148 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1149 self.assertTrue(pair.readable())
1150
1151 def test_writeable(self):
1152 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1153 self.assertTrue(pair.writable())
1154
1155 def test_seekable(self):
1156 # BufferedRWPairs are never seekable, even if their readers and writers
1157 # are.
1158 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1159 self.assertFalse(pair.seekable())
1160
1161 # .flush() is delegated to the underlying writer object and has been
1162 # tested in the test_write method.
1163
1164 def test_close_and_closed(self):
1165 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1166 self.assertFalse(pair.closed)
1167 pair.close()
1168 self.assertTrue(pair.closed)
1169
1170 def test_isatty(self):
1171 class SelectableIsAtty(MockRawIO):
1172 def __init__(self, isatty):
1173 MockRawIO.__init__(self)
1174 self._isatty = isatty
1175
1176 def isatty(self):
1177 return self._isatty
1178
1179 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1180 self.assertFalse(pair.isatty())
1181
1182 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1183 self.assertTrue(pair.isatty())
1184
1185 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1186 self.assertTrue(pair.isatty())
1187
1188 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1189 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001191class CBufferedRWPairTest(BufferedRWPairTest):
1192 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001194class PyBufferedRWPairTest(BufferedRWPairTest):
1195 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001197
1198class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1199 read_mode = "rb+"
1200 write_mode = "wb+"
1201
1202 def test_constructor(self):
1203 BufferedReaderTest.test_constructor(self)
1204 BufferedWriterTest.test_constructor(self)
1205
1206 def test_read_and_write(self):
1207 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001208 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001209
1210 self.assertEqual(b"as", rw.read(2))
1211 rw.write(b"ddd")
1212 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001213 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001214 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001215 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001217 def test_seek_and_tell(self):
1218 raw = self.BytesIO(b"asdfghjkl")
1219 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001220
1221 self.assertEquals(b"as", rw.read(2))
1222 self.assertEquals(2, rw.tell())
1223 rw.seek(0, 0)
1224 self.assertEquals(b"asdf", rw.read(4))
1225
1226 rw.write(b"asdf")
1227 rw.seek(0, 0)
1228 self.assertEquals(b"asdfasdfl", rw.read())
1229 self.assertEquals(9, rw.tell())
1230 rw.seek(-4, 2)
1231 self.assertEquals(5, rw.tell())
1232 rw.seek(2, 1)
1233 self.assertEquals(7, rw.tell())
1234 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001235 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001237 def check_flush_and_read(self, read_func):
1238 raw = self.BytesIO(b"abcdefghi")
1239 bufio = self.tp(raw)
1240
1241 self.assertEquals(b"ab", read_func(bufio, 2))
1242 bufio.write(b"12")
1243 self.assertEquals(b"ef", read_func(bufio, 2))
1244 self.assertEquals(6, bufio.tell())
1245 bufio.flush()
1246 self.assertEquals(6, bufio.tell())
1247 self.assertEquals(b"ghi", read_func(bufio))
1248 raw.seek(0, 0)
1249 raw.write(b"XYZ")
1250 # flush() resets the read buffer
1251 bufio.flush()
1252 bufio.seek(0, 0)
1253 self.assertEquals(b"XYZ", read_func(bufio, 3))
1254
1255 def test_flush_and_read(self):
1256 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1257
1258 def test_flush_and_readinto(self):
1259 def _readinto(bufio, n=-1):
1260 b = bytearray(n if n >= 0 else 9999)
1261 n = bufio.readinto(b)
1262 return bytes(b[:n])
1263 self.check_flush_and_read(_readinto)
1264
1265 def test_flush_and_peek(self):
1266 def _peek(bufio, n=-1):
1267 # This relies on the fact that the buffer can contain the whole
1268 # raw stream, otherwise peek() can return less.
1269 b = bufio.peek(n)
1270 if n != -1:
1271 b = b[:n]
1272 bufio.seek(len(b), 1)
1273 return b
1274 self.check_flush_and_read(_peek)
1275
1276 def test_flush_and_write(self):
1277 raw = self.BytesIO(b"abcdefghi")
1278 bufio = self.tp(raw)
1279
1280 bufio.write(b"123")
1281 bufio.flush()
1282 bufio.write(b"45")
1283 bufio.flush()
1284 bufio.seek(0, 0)
1285 self.assertEquals(b"12345fghi", raw.getvalue())
1286 self.assertEquals(b"12345fghi", bufio.read())
1287
1288 def test_threads(self):
1289 BufferedReaderTest.test_threads(self)
1290 BufferedWriterTest.test_threads(self)
1291
1292 def test_writes_and_peek(self):
1293 def _peek(bufio):
1294 bufio.peek(1)
1295 self.check_writes(_peek)
1296 def _peek(bufio):
1297 pos = bufio.tell()
1298 bufio.seek(-1, 1)
1299 bufio.peek(1)
1300 bufio.seek(pos, 0)
1301 self.check_writes(_peek)
1302
1303 def test_writes_and_reads(self):
1304 def _read(bufio):
1305 bufio.seek(-1, 1)
1306 bufio.read(1)
1307 self.check_writes(_read)
1308
1309 def test_writes_and_read1s(self):
1310 def _read1(bufio):
1311 bufio.seek(-1, 1)
1312 bufio.read1(1)
1313 self.check_writes(_read1)
1314
1315 def test_writes_and_readintos(self):
1316 def _read(bufio):
1317 bufio.seek(-1, 1)
1318 bufio.readinto(bytearray(1))
1319 self.check_writes(_read)
1320
1321 def test_misbehaved_io(self):
1322 BufferedReaderTest.test_misbehaved_io(self)
1323 BufferedWriterTest.test_misbehaved_io(self)
1324
1325class CBufferedRandomTest(BufferedRandomTest):
1326 tp = io.BufferedRandom
1327
1328 def test_constructor(self):
1329 BufferedRandomTest.test_constructor(self)
1330 # The allocation can succeed on 32-bit builds, e.g. with more
1331 # than 2GB RAM and a 64-bit kernel.
1332 if sys.maxsize > 0x7FFFFFFF:
1333 rawio = self.MockRawIO()
1334 bufio = self.tp(rawio)
1335 self.assertRaises((OverflowError, MemoryError, ValueError),
1336 bufio.__init__, rawio, sys.maxsize)
1337
1338 def test_garbage_collection(self):
1339 CBufferedReaderTest.test_garbage_collection(self)
1340 CBufferedWriterTest.test_garbage_collection(self)
1341
1342class PyBufferedRandomTest(BufferedRandomTest):
1343 tp = pyio.BufferedRandom
1344
1345
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001346# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1347# properties:
1348# - A single output character can correspond to many bytes of input.
1349# - The number of input bytes to complete the character can be
1350# undetermined until the last input byte is received.
1351# - The number of input bytes can vary depending on previous input.
1352# - A single input byte can correspond to many characters of output.
1353# - The number of output characters can be undetermined until the
1354# last input byte is received.
1355# - The number of output characters can vary depending on previous input.
1356
1357class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1358 """
1359 For testing seek/tell behavior with a stateful, buffering decoder.
1360
1361 Input is a sequence of words. Words may be fixed-length (length set
1362 by input) or variable-length (period-terminated). In variable-length
1363 mode, extra periods are ignored. Possible words are:
1364 - 'i' followed by a number sets the input length, I (maximum 99).
1365 When I is set to 0, words are space-terminated.
1366 - 'o' followed by a number sets the output length, O (maximum 99).
1367 - Any other word is converted into a word followed by a period on
1368 the output. The output word consists of the input word truncated
1369 or padded out with hyphens to make its length equal to O. If O
1370 is 0, the word is output verbatim without truncating or padding.
1371 I and O are initially set to 1. When I changes, any buffered input is
1372 re-scanned according to the new I. EOF also terminates the last word.
1373 """
1374
1375 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001376 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001377 self.reset()
1378
1379 def __repr__(self):
1380 return '<SID %x>' % id(self)
1381
1382 def reset(self):
1383 self.i = 1
1384 self.o = 1
1385 self.buffer = bytearray()
1386
1387 def getstate(self):
1388 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1389 return bytes(self.buffer), i*100 + o
1390
1391 def setstate(self, state):
1392 buffer, io = state
1393 self.buffer = bytearray(buffer)
1394 i, o = divmod(io, 100)
1395 self.i, self.o = i ^ 1, o ^ 1
1396
1397 def decode(self, input, final=False):
1398 output = ''
1399 for b in input:
1400 if self.i == 0: # variable-length, terminated with period
1401 if b == ord('.'):
1402 if self.buffer:
1403 output += self.process_word()
1404 else:
1405 self.buffer.append(b)
1406 else: # fixed-length, terminate after self.i bytes
1407 self.buffer.append(b)
1408 if len(self.buffer) == self.i:
1409 output += self.process_word()
1410 if final and self.buffer: # EOF terminates the last word
1411 output += self.process_word()
1412 return output
1413
1414 def process_word(self):
1415 output = ''
1416 if self.buffer[0] == ord('i'):
1417 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1418 elif self.buffer[0] == ord('o'):
1419 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1420 else:
1421 output = self.buffer.decode('ascii')
1422 if len(output) < self.o:
1423 output += '-'*self.o # pad out with hyphens
1424 if self.o:
1425 output = output[:self.o] # truncate to output length
1426 output += '.'
1427 self.buffer = bytearray()
1428 return output
1429
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001430 codecEnabled = False
1431
1432 @classmethod
1433 def lookupTestDecoder(cls, name):
1434 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001435 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001436 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001437 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001438 incrementalencoder=None,
1439 streamreader=None, streamwriter=None,
1440 incrementaldecoder=cls)
1441
1442# Register the previous decoder for testing.
1443# Disabled by default, tests will enable it.
1444codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1445
1446
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001447class StatefulIncrementalDecoderTest(unittest.TestCase):
1448 """
1449 Make sure the StatefulIncrementalDecoder actually works.
1450 """
1451
1452 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001453 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001454 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001455 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001456 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001457 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001458 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001459 # I=0, O=6 (variable-length input, fixed-length output)
1460 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1461 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001462 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001463 # I=6, O=3 (fixed-length input > fixed-length output)
1464 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1465 # I=0, then 3; O=29, then 15 (with longer output)
1466 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1467 'a----------------------------.' +
1468 'b----------------------------.' +
1469 'cde--------------------------.' +
1470 'abcdefghijabcde.' +
1471 'a.b------------.' +
1472 '.c.------------.' +
1473 'd.e------------.' +
1474 'k--------------.' +
1475 'l--------------.' +
1476 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001477 ]
1478
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001479 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001480 # Try a few one-shot test cases.
1481 for input, eof, output in self.test_cases:
1482 d = StatefulIncrementalDecoder()
1483 self.assertEquals(d.decode(input, eof), output)
1484
1485 # Also test an unfinished decode, followed by forcing EOF.
1486 d = StatefulIncrementalDecoder()
1487 self.assertEquals(d.decode(b'oiabcd'), '')
1488 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001489
1490class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001491
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001492 def setUp(self):
1493 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1494 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001495 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001496
Guido van Rossumd0712812007-04-11 16:32:43 +00001497 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001498 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001499
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500 def test_constructor(self):
1501 r = self.BytesIO(b"\xc3\xa9\n\n")
1502 b = self.BufferedReader(r, 1000)
1503 t = self.TextIOWrapper(b)
1504 t.__init__(b, encoding="latin1", newline="\r\n")
1505 self.assertEquals(t.encoding, "latin1")
1506 self.assertEquals(t.line_buffering, False)
1507 t.__init__(b, encoding="utf8", line_buffering=True)
1508 self.assertEquals(t.encoding, "utf8")
1509 self.assertEquals(t.line_buffering, True)
1510 self.assertEquals("\xe9\n", t.readline())
1511 self.assertRaises(TypeError, t.__init__, b, newline=42)
1512 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1513
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001514 def test_detach(self):
1515 r = self.BytesIO()
1516 b = self.BufferedWriter(r)
1517 t = self.TextIOWrapper(b)
1518 self.assertIs(t.detach(), b)
1519
1520 t = self.TextIOWrapper(b, encoding="ascii")
1521 t.write("howdy")
1522 self.assertFalse(r.getvalue())
1523 t.detach()
1524 self.assertEqual(r.getvalue(), b"howdy")
1525 self.assertRaises(ValueError, t.detach)
1526
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001527 def test_repr(self):
1528 raw = self.BytesIO("hello".encode("utf-8"))
1529 b = self.BufferedReader(raw)
1530 t = self.TextIOWrapper(b, encoding="utf-8")
1531 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 def test_line_buffering(self):
1534 r = self.BytesIO()
1535 b = self.BufferedWriter(r, 1000)
1536 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001537 t.write("X")
1538 self.assertEquals(r.getvalue(), b"") # No flush happened
1539 t.write("Y\nZ")
1540 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1541 t.write("A\rB")
1542 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1543
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 def test_encoding(self):
1545 # Check the encoding attribute is always set, and valid
1546 b = self.BytesIO()
1547 t = self.TextIOWrapper(b, encoding="utf8")
1548 self.assertEqual(t.encoding, "utf8")
1549 t = self.TextIOWrapper(b)
1550 self.assert_(t.encoding is not None)
1551 codecs.lookup(t.encoding)
1552
1553 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001554 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 b = self.BytesIO(b"abc\n\xff\n")
1556 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001557 self.assertRaises(UnicodeError, t.read)
1558 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 b = self.BytesIO(b"abc\n\xff\n")
1560 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001561 self.assertRaises(UnicodeError, t.read)
1562 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 b = self.BytesIO(b"abc\n\xff\n")
1564 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001565 self.assertEquals(t.read(), "abc\n\n")
1566 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001567 b = self.BytesIO(b"abc\n\xff\n")
1568 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001569 self.assertEquals(t.read(), "abc\n\ufffd\n")
1570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001572 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001573 b = self.BytesIO()
1574 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001575 self.assertRaises(UnicodeError, t.write, "\xff")
1576 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001577 b = self.BytesIO()
1578 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001579 self.assertRaises(UnicodeError, t.write, "\xff")
1580 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001581 b = self.BytesIO()
1582 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001583 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001584 t.write("abc\xffdef\n")
1585 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001586 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001587 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 b = self.BytesIO()
1589 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001590 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001591 t.write("abc\xffdef\n")
1592 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001593 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001596 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1597
1598 tests = [
1599 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001600 [ '', input_lines ],
1601 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1602 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1603 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001604 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001605 encodings = (
1606 'utf-8', 'latin-1',
1607 'utf-16', 'utf-16-le', 'utf-16-be',
1608 'utf-32', 'utf-32-le', 'utf-32-be',
1609 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001610
Guido van Rossum8358db22007-08-18 21:39:55 +00001611 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001612 # character in TextIOWrapper._pending_line.
1613 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001614 # XXX: str.encode() should return bytes
1615 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001616 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001617 for bufsize in range(1, 10):
1618 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1620 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001621 encoding=encoding)
1622 if do_reads:
1623 got_lines = []
1624 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001625 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001626 if c2 == '':
1627 break
1628 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001629 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001630 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001631 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001632
1633 for got_line, exp_line in zip(got_lines, exp_lines):
1634 self.assertEquals(got_line, exp_line)
1635 self.assertEquals(len(got_lines), len(exp_lines))
1636
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 def test_newlines_input(self):
1638 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001639 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1640 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001641 (None, normalized.decode("ascii").splitlines(True)),
1642 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1644 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1645 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001646 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001647 buf = self.BytesIO(testdata)
1648 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001649 self.assertEquals(txt.readlines(), expected)
1650 txt.seek(0)
1651 self.assertEquals(txt.read(), "".join(expected))
1652
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 def test_newlines_output(self):
1654 testdict = {
1655 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1656 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1657 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1658 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1659 }
1660 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1661 for newline, expected in tests:
1662 buf = self.BytesIO()
1663 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1664 txt.write("AAA\nB")
1665 txt.write("BB\nCCC\n")
1666 txt.write("X\rY\r\nZ")
1667 txt.flush()
1668 self.assertEquals(buf.closed, False)
1669 self.assertEquals(buf.getvalue(), expected)
1670
1671 def test_destructor(self):
1672 l = []
1673 base = self.BytesIO
1674 class MyBytesIO(base):
1675 def close(self):
1676 l.append(self.getvalue())
1677 base.close(self)
1678 b = MyBytesIO()
1679 t = self.TextIOWrapper(b, encoding="ascii")
1680 t.write("abc")
1681 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001682 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 self.assertEquals([b"abc"], l)
1684
1685 def test_override_destructor(self):
1686 record = []
1687 class MyTextIO(self.TextIOWrapper):
1688 def __del__(self):
1689 record.append(1)
1690 try:
1691 f = super().__del__
1692 except AttributeError:
1693 pass
1694 else:
1695 f()
1696 def close(self):
1697 record.append(2)
1698 super().close()
1699 def flush(self):
1700 record.append(3)
1701 super().flush()
1702 b = self.BytesIO()
1703 t = MyTextIO(b, encoding="ascii")
1704 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001705 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 self.assertEqual(record, [1, 2, 3])
1707
1708 def test_error_through_destructor(self):
1709 # Test that the exception state is not modified by a destructor,
1710 # even if close() fails.
1711 rawio = self.CloseFailureIO()
1712 def f():
1713 self.TextIOWrapper(rawio).xyzzy
1714 with support.captured_output("stderr") as s:
1715 self.assertRaises(AttributeError, f)
1716 s = s.getvalue().strip()
1717 if s:
1718 # The destructor *may* have printed an unraisable error, check it
1719 self.assertEqual(len(s.splitlines()), 1)
1720 self.assert_(s.startswith("Exception IOError: "), s)
1721 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001722
Guido van Rossum9b76da62007-04-11 01:09:03 +00001723 # Systematic tests of the text I/O API
1724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001726 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1727 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001728 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001729 f._CHUNK_SIZE = chunksize
1730 self.assertEquals(f.write("abc"), 3)
1731 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001732 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001733 f._CHUNK_SIZE = chunksize
1734 self.assertEquals(f.tell(), 0)
1735 self.assertEquals(f.read(), "abc")
1736 cookie = f.tell()
1737 self.assertEquals(f.seek(0), 0)
1738 self.assertEquals(f.read(2), "ab")
1739 self.assertEquals(f.read(1), "c")
1740 self.assertEquals(f.read(1), "")
1741 self.assertEquals(f.read(), "")
1742 self.assertEquals(f.tell(), cookie)
1743 self.assertEquals(f.seek(0), 0)
1744 self.assertEquals(f.seek(0, 2), cookie)
1745 self.assertEquals(f.write("def"), 3)
1746 self.assertEquals(f.seek(cookie), cookie)
1747 self.assertEquals(f.read(), "def")
1748 if enc.startswith("utf"):
1749 self.multi_line_test(f, enc)
1750 f.close()
1751
1752 def multi_line_test(self, f, enc):
1753 f.seek(0)
1754 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001755 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001756 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001757 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001758 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001759 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001760 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001761 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001762 wlines.append((f.tell(), line))
1763 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001764 f.seek(0)
1765 rlines = []
1766 while True:
1767 pos = f.tell()
1768 line = f.readline()
1769 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001770 break
1771 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001772 self.assertEquals(rlines, wlines)
1773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774 def test_telling(self):
1775 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001776 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001777 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001778 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001779 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001780 p2 = f.tell()
1781 f.seek(0)
1782 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001783 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001784 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001785 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001786 self.assertEquals(f.tell(), p2)
1787 f.seek(0)
1788 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001789 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001790 self.assertRaises(IOError, f.tell)
1791 self.assertEquals(f.tell(), p2)
1792 f.close()
1793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794 def test_seeking(self):
1795 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001796 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001797 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001798 prefix = bytes(u_prefix.encode("utf-8"))
1799 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001800 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001801 suffix = bytes(u_suffix.encode("utf-8"))
1802 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001803 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001804 f.write(line*2)
1805 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001807 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001808 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001809 self.assertEquals(f.tell(), prefix_size)
1810 self.assertEquals(f.readline(), u_suffix)
1811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001812 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001813 # Regression test for a specific bug
1814 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001816 f.write(data)
1817 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001818 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001819 f._CHUNK_SIZE # Just test that it exists
1820 f._CHUNK_SIZE = 2
1821 f.readline()
1822 f.tell()
1823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 def test_seek_and_tell(self):
1825 #Test seek/tell using the StatefulIncrementalDecoder.
1826 # Make test faster by doing smaller seeks
1827 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001828
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001829 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001830 """Tell/seek to various points within a data stream and ensure
1831 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001833 f.write(data)
1834 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001835 f = self.open(support.TESTFN, encoding='test_decoder')
1836 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001837 decoded = f.read()
1838 f.close()
1839
Neal Norwitze2b07052008-03-18 19:52:05 +00001840 for i in range(min_pos, len(decoded) + 1): # seek positions
1841 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001843 self.assertEquals(f.read(i), decoded[:i])
1844 cookie = f.tell()
1845 self.assertEquals(f.read(j), decoded[i:i + j])
1846 f.seek(cookie)
1847 self.assertEquals(f.read(), decoded[i:])
1848 f.close()
1849
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001850 # Enable the test decoder.
1851 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001852
1853 # Run the tests.
1854 try:
1855 # Try each test case.
1856 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001857 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001858
1859 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001860 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1861 offset = CHUNK_SIZE - len(input)//2
1862 prefix = b'.'*offset
1863 # Don't bother seeking into the prefix (takes too long).
1864 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001865 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001866
1867 # Ensure our test decoder won't interfere with subsequent tests.
1868 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001869 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001872 data = "1234567890"
1873 tests = ("utf-16",
1874 "utf-16-le",
1875 "utf-16-be",
1876 "utf-32",
1877 "utf-32-le",
1878 "utf-32-be")
1879 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 buf = self.BytesIO()
1881 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001882 # Check if the BOM is written only once (see issue1753).
1883 f.write(data)
1884 f.write(data)
1885 f.seek(0)
1886 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001887 f.seek(0)
1888 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001889 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1890
Benjamin Petersona1b49012009-03-31 23:11:32 +00001891 def test_unreadable(self):
1892 class UnReadable(self.BytesIO):
1893 def readable(self):
1894 return False
1895 txt = self.TextIOWrapper(UnReadable())
1896 self.assertRaises(IOError, txt.read)
1897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 def test_read_one_by_one(self):
1899 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001900 reads = ""
1901 while True:
1902 c = txt.read(1)
1903 if not c:
1904 break
1905 reads += c
1906 self.assertEquals(reads, "AA\nBB")
1907
1908 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001910 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001911 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001912 reads = ""
1913 while True:
1914 c = txt.read(128)
1915 if not c:
1916 break
1917 reads += c
1918 self.assertEquals(reads, "A"*127+"\nB")
1919
1920 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001922
1923 # read one char at a time
1924 reads = ""
1925 while True:
1926 c = txt.read(1)
1927 if not c:
1928 break
1929 reads += c
1930 self.assertEquals(reads, self.normalized)
1931
1932 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001933 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001934 txt._CHUNK_SIZE = 4
1935
1936 reads = ""
1937 while True:
1938 c = txt.read(4)
1939 if not c:
1940 break
1941 reads += c
1942 self.assertEquals(reads, self.normalized)
1943
1944 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001946 txt._CHUNK_SIZE = 4
1947
1948 reads = txt.read(4)
1949 reads += txt.read(4)
1950 reads += txt.readline()
1951 reads += txt.readline()
1952 reads += txt.readline()
1953 self.assertEquals(reads, self.normalized)
1954
1955 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001956 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001957 txt._CHUNK_SIZE = 4
1958
1959 reads = txt.read(4)
1960 reads += txt.read()
1961 self.assertEquals(reads, self.normalized)
1962
1963 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001964 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001965 txt._CHUNK_SIZE = 4
1966
1967 reads = txt.read(4)
1968 pos = txt.tell()
1969 txt.seek(0)
1970 txt.seek(pos)
1971 self.assertEquals(txt.read(4), "BBB\n")
1972
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001973 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001974 buffer = self.BytesIO(self.testdata)
1975 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001976
1977 self.assertEqual(buffer.seekable(), txt.seekable())
1978
Antoine Pitroue4501852009-05-14 18:55:55 +00001979 def test_append_bom(self):
1980 # The BOM is not written again when appending to a non-empty file
1981 filename = support.TESTFN
1982 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
1983 with self.open(filename, 'w', encoding=charset) as f:
1984 f.write('aaa')
1985 pos = f.tell()
1986 with self.open(filename, 'rb') as f:
1987 self.assertEquals(f.read(), 'aaa'.encode(charset))
1988
1989 with self.open(filename, 'a', encoding=charset) as f:
1990 f.write('xxx')
1991 with self.open(filename, 'rb') as f:
1992 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
1993
1994 def test_seek_bom(self):
1995 # Same test, but when seeking manually
1996 filename = support.TESTFN
1997 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
1998 with self.open(filename, 'w', encoding=charset) as f:
1999 f.write('aaa')
2000 pos = f.tell()
2001 with self.open(filename, 'r+', encoding=charset) as f:
2002 f.seek(pos)
2003 f.write('zzz')
2004 f.seek(0)
2005 f.write('bbb')
2006 with self.open(filename, 'rb') as f:
2007 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2008
2009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002010class CTextIOWrapperTest(TextIOWrapperTest):
2011
2012 def test_initialization(self):
2013 r = self.BytesIO(b"\xc3\xa9\n\n")
2014 b = self.BufferedReader(r, 1000)
2015 t = self.TextIOWrapper(b)
2016 self.assertRaises(TypeError, t.__init__, b, newline=42)
2017 self.assertRaises(ValueError, t.read)
2018 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2019 self.assertRaises(ValueError, t.read)
2020
2021 def test_garbage_collection(self):
2022 # C TextIOWrapper objects are collected, and collecting them flushes
2023 # all data to disk.
2024 # The Python version has __del__, so it ends in gc.garbage instead.
2025 rawio = io.FileIO(support.TESTFN, "wb")
2026 b = self.BufferedWriter(rawio)
2027 t = self.TextIOWrapper(b, encoding="ascii")
2028 t.write("456def")
2029 t.x = t
2030 wr = weakref.ref(t)
2031 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002032 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002033 self.assert_(wr() is None, wr)
2034 with open(support.TESTFN, "rb") as f:
2035 self.assertEqual(f.read(), b"456def")
2036
2037class PyTextIOWrapperTest(TextIOWrapperTest):
2038 pass
2039
2040
2041class IncrementalNewlineDecoderTest(unittest.TestCase):
2042
2043 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002044 # UTF-8 specific tests for a newline decoder
2045 def _check_decode(b, s, **kwargs):
2046 # We exercise getstate() / setstate() as well as decode()
2047 state = decoder.getstate()
2048 self.assertEquals(decoder.decode(b, **kwargs), s)
2049 decoder.setstate(state)
2050 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002051
Antoine Pitrou180a3362008-12-14 16:36:46 +00002052 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002053
Antoine Pitrou180a3362008-12-14 16:36:46 +00002054 _check_decode(b'\xe8', "")
2055 _check_decode(b'\xa2', "")
2056 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002057
Antoine Pitrou180a3362008-12-14 16:36:46 +00002058 _check_decode(b'\xe8', "")
2059 _check_decode(b'\xa2', "")
2060 _check_decode(b'\x88', "\u8888")
2061
2062 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002063 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2064
Antoine Pitrou180a3362008-12-14 16:36:46 +00002065 decoder.reset()
2066 _check_decode(b'\n', "\n")
2067 _check_decode(b'\r', "")
2068 _check_decode(b'', "\n", final=True)
2069 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002070
Antoine Pitrou180a3362008-12-14 16:36:46 +00002071 _check_decode(b'\r', "")
2072 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002073
Antoine Pitrou180a3362008-12-14 16:36:46 +00002074 _check_decode(b'\r\r\n', "\n\n")
2075 _check_decode(b'\r', "")
2076 _check_decode(b'\r', "\n")
2077 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002078
Antoine Pitrou180a3362008-12-14 16:36:46 +00002079 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2080 _check_decode(b'\xe8\xa2\x88', "\u8888")
2081 _check_decode(b'\n', "\n")
2082 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2083 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002084
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002085 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002086 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002087 if encoding is not None:
2088 encoder = codecs.getincrementalencoder(encoding)()
2089 def _decode_bytewise(s):
2090 # Decode one byte at a time
2091 for b in encoder.encode(s):
2092 result.append(decoder.decode(bytes([b])))
2093 else:
2094 encoder = None
2095 def _decode_bytewise(s):
2096 # Decode one char at a time
2097 for c in s:
2098 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002099 self.assertEquals(decoder.newlines, None)
2100 _decode_bytewise("abc\n\r")
2101 self.assertEquals(decoder.newlines, '\n')
2102 _decode_bytewise("\nabc")
2103 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2104 _decode_bytewise("abc\r")
2105 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2106 _decode_bytewise("abc")
2107 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2108 _decode_bytewise("abc\r")
2109 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2110 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 input = "abc"
2112 if encoder is not None:
2113 encoder.reset()
2114 input = encoder.encode(input)
2115 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002116 self.assertEquals(decoder.newlines, None)
2117
2118 def test_newline_decoder(self):
2119 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002120 # None meaning the IncrementalNewlineDecoder takes unicode input
2121 # rather than bytes input
2122 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002123 'utf-16', 'utf-16-le', 'utf-16-be',
2124 'utf-32', 'utf-32-le', 'utf-32-be',
2125 )
2126 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002127 decoder = enc and codecs.getincrementaldecoder(enc)()
2128 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2129 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002130 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2132 self.check_newline_decoding_utf8(decoder)
2133
Antoine Pitrou66913e22009-03-06 23:40:56 +00002134 def test_newline_bytes(self):
2135 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2136 def _check(dec):
2137 self.assertEquals(dec.newlines, None)
2138 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2139 self.assertEquals(dec.newlines, None)
2140 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2141 self.assertEquals(dec.newlines, None)
2142 dec = self.IncrementalNewlineDecoder(None, translate=False)
2143 _check(dec)
2144 dec = self.IncrementalNewlineDecoder(None, translate=True)
2145 _check(dec)
2146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2148 pass
2149
2150class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2151 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002152
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002153
Guido van Rossum01a27522007-03-07 01:00:12 +00002154# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002155
Guido van Rossum5abbf752007-08-27 17:39:33 +00002156class MiscIOTest(unittest.TestCase):
2157
Barry Warsaw40e82462008-11-20 20:14:50 +00002158 def tearDown(self):
2159 support.unlink(support.TESTFN)
2160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002161 def test___all__(self):
2162 for name in self.io.__all__:
2163 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002164 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002165 if name == "open":
2166 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002167 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002168 self.assertTrue(issubclass(obj, Exception), name)
2169 elif not name.startswith("SEEK_"):
2170 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002171
Barry Warsaw40e82462008-11-20 20:14:50 +00002172 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002174 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002175 f.close()
2176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002178 self.assertEquals(f.name, support.TESTFN)
2179 self.assertEquals(f.buffer.name, support.TESTFN)
2180 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2181 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002182 self.assertEquals(f.buffer.mode, "rb")
2183 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002184 f.close()
2185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002187 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002188 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2189 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002192 self.assertEquals(g.mode, "wb")
2193 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002194 self.assertEquals(g.name, f.fileno())
2195 self.assertEquals(g.raw.name, f.fileno())
2196 f.close()
2197 g.close()
2198
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002199 def test_io_after_close(self):
2200 for kwargs in [
2201 {"mode": "w"},
2202 {"mode": "wb"},
2203 {"mode": "w", "buffering": 1},
2204 {"mode": "w", "buffering": 2},
2205 {"mode": "wb", "buffering": 0},
2206 {"mode": "r"},
2207 {"mode": "rb"},
2208 {"mode": "r", "buffering": 1},
2209 {"mode": "r", "buffering": 2},
2210 {"mode": "rb", "buffering": 0},
2211 {"mode": "w+"},
2212 {"mode": "w+b"},
2213 {"mode": "w+", "buffering": 1},
2214 {"mode": "w+", "buffering": 2},
2215 {"mode": "w+b", "buffering": 0},
2216 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002218 f.close()
2219 self.assertRaises(ValueError, f.flush)
2220 self.assertRaises(ValueError, f.fileno)
2221 self.assertRaises(ValueError, f.isatty)
2222 self.assertRaises(ValueError, f.__iter__)
2223 if hasattr(f, "peek"):
2224 self.assertRaises(ValueError, f.peek, 1)
2225 self.assertRaises(ValueError, f.read)
2226 if hasattr(f, "read1"):
2227 self.assertRaises(ValueError, f.read1, 1024)
2228 if hasattr(f, "readinto"):
2229 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2230 self.assertRaises(ValueError, f.readline)
2231 self.assertRaises(ValueError, f.readlines)
2232 self.assertRaises(ValueError, f.seek, 0)
2233 self.assertRaises(ValueError, f.tell)
2234 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 self.assertRaises(ValueError, f.write,
2236 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002237 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240 def test_blockingioerror(self):
2241 # Various BlockingIOError issues
2242 self.assertRaises(TypeError, self.BlockingIOError)
2243 self.assertRaises(TypeError, self.BlockingIOError, 1)
2244 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2245 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2246 b = self.BlockingIOError(1, "")
2247 self.assertEqual(b.characters_written, 0)
2248 class C(str):
2249 pass
2250 c = C("")
2251 b = self.BlockingIOError(1, c)
2252 c.b = b
2253 b.c = c
2254 wr = weakref.ref(c)
2255 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002256 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002257 self.assert_(wr() is None, wr)
2258
2259 def test_abcs(self):
2260 # Test the visible base classes are ABCs.
2261 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2262 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2263 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2264 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2265
2266 def _check_abc_inheritance(self, abcmodule):
2267 with self.open(support.TESTFN, "wb", buffering=0) as f:
2268 self.assertTrue(isinstance(f, abcmodule.IOBase))
2269 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2270 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2271 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2272 with self.open(support.TESTFN, "wb") as f:
2273 self.assertTrue(isinstance(f, abcmodule.IOBase))
2274 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2275 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2276 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2277 with self.open(support.TESTFN, "w") as f:
2278 self.assertTrue(isinstance(f, abcmodule.IOBase))
2279 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2280 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2281 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2282
2283 def test_abc_inheritance(self):
2284 # Test implementations inherit from their respective ABCs
2285 self._check_abc_inheritance(self)
2286
2287 def test_abc_inheritance_official(self):
2288 # Test implementations inherit from the official ABCs of the
2289 # baseline "io" module.
2290 self._check_abc_inheritance(io)
2291
2292class CMiscIOTest(MiscIOTest):
2293 io = io
2294
2295class PyMiscIOTest(MiscIOTest):
2296 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002297
Guido van Rossum28524c72007-02-27 05:47:44 +00002298def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002299 tests = (CIOTest, PyIOTest,
2300 CBufferedReaderTest, PyBufferedReaderTest,
2301 CBufferedWriterTest, PyBufferedWriterTest,
2302 CBufferedRWPairTest, PyBufferedRWPairTest,
2303 CBufferedRandomTest, PyBufferedRandomTest,
2304 StatefulIncrementalDecoderTest,
2305 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2306 CTextIOWrapperTest, PyTextIOWrapperTest,
2307 CMiscIOTest, PyMiscIOTest,)
2308
2309 # Put the namespaces of the IO module we are testing and some useful mock
2310 # classes in the __dict__ of each test.
2311 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2312 MockNonBlockWriterIO)
2313 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2314 c_io_ns = {name : getattr(io, name) for name in all_members}
2315 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2316 globs = globals()
2317 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2318 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2319 # Avoid turning open into a bound method.
2320 py_io_ns["open"] = pyio.OpenWrapper
2321 for test in tests:
2322 if test.__name__.startswith("C"):
2323 for name, obj in c_io_ns.items():
2324 setattr(test, name, obj)
2325 elif test.__name__.startswith("Py"):
2326 for name, obj in py_io_ns.items():
2327 setattr(test, name, obj)
2328
2329 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002330
2331if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002332 test_main()