blob: 53017f352a8f7922d6005cbac7c0f335757208fc [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000294 f = self.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000300 f = self.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000301 self.assertEqual(f.readable(), True)
302 self.assertEqual(f.writable(), False)
303 self.assertEqual(f.seekable(), True)
304 self.read_ops(f)
305 f.close()
306
Guido van Rossum87429772007-04-10 21:06:59 +0000307 def test_buffered_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000308 f = self.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.readable(), False)
310 self.assertEqual(f.writable(), True)
311 self.assertEqual(f.seekable(), True)
312 self.write_ops(f)
313 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000314 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.readable(), True)
316 self.assertEqual(f.writable(), False)
317 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000319 f.close()
320
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000321 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000322 f = io.open(support.TESTFN, "wb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000323 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000324 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000325 f = self.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000326 self.assertEqual(f.readline(), b"abc\n")
327 self.assertEqual(f.readline(10), b"def\n")
328 self.assertEqual(f.readline(2), b"xy")
329 self.assertEqual(f.readline(4), b"zzy\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000330 self.assertEqual(f.readline(), b"foo\x00bar\n")
331 self.assertEqual(f.readline(), b"another line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000332 f.close()
333
Guido van Rossum28524c72007-02-27 05:47:44 +0000334 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000335 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000336 self.write_ops(f)
337 data = f.getvalue()
338 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000339 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000340 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000341
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000343 # On Windows and Mac OSX this test comsumes large resources; It takes
344 # a long time to build the >2GB file and takes >2GB of disk space
345 # therefore the resource must be enabled to run this test.
346 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000347 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 print("\nTesting large file ops skipped on %s." % sys.platform,
349 file=sys.stderr)
350 print("It requires %d bytes and a long time." % self.LARGE,
351 file=sys.stderr)
352 print("Use 'regrtest.py -u largefile test_io' to run it.",
353 file=sys.stderr)
354 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000355 with self.open(support.TESTFN, "w+b", 0) as f:
356 self.large_file_ops(f)
357 with self.open(support.TESTFN, "w+b") as f:
358 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000359
360 def test_with_open(self):
361 for bufsize in (0, 1, 100):
362 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000363 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000364 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000365 self.assertEqual(f.closed, True)
366 f = None
367 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000369 1/0
370 except ZeroDivisionError:
371 self.assertEqual(f.closed, True)
372 else:
373 self.fail("1/0 didn't raise an exception")
374
Antoine Pitrou08838b62009-01-21 00:55:13 +0000375 # issue 5008
376 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000377 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000378 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000379 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000380 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000381 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000382 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000383 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000384 self.assert_(f.tell() > 0)
385
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def test_destructor(self):
387 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000389 def __del__(self):
390 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 try:
392 f = super().__del__
393 except AttributeError:
394 pass
395 else:
396 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000397 def close(self):
398 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000400 def flush(self):
401 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 super().flush()
403 f = MyFileIO(support.TESTFN, "wb")
404 f.write(b"xxx")
405 del f
406 self.assertEqual(record, [1, 2, 3])
407 f = open(support.TESTFN, "rb")
408 self.assertEqual(f.read(), b"xxx")
409
410 def _check_base_destructor(self, base):
411 record = []
412 class MyIO(base):
413 def __init__(self):
414 # This exercises the availability of attributes on object
415 # destruction.
416 # (in the C version, close() is called by the tp_dealloc
417 # function, not by __del__)
418 self.on_del = 1
419 self.on_close = 2
420 self.on_flush = 3
421 def __del__(self):
422 record.append(self.on_del)
423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
429 def close(self):
430 record.append(self.on_close)
431 super().close()
432 def flush(self):
433 record.append(self.on_flush)
434 super().flush()
435 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000436 del f
437 self.assertEqual(record, [1, 2, 3])
438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 def test_IOBase_destructor(self):
440 self._check_base_destructor(self.IOBase)
441
442 def test_RawIOBase_destructor(self):
443 self._check_base_destructor(self.RawIOBase)
444
445 def test_BufferedIOBase_destructor(self):
446 self._check_base_destructor(self.BufferedIOBase)
447
448 def test_TextIOBase_destructor(self):
449 self._check_base_destructor(self.TextIOBase)
450
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def test_close_flushes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 f = self.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000453 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000454 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000456 self.assertEqual(f.read(), b"xxx")
457 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000458
Guido van Rossumd4103952007-04-12 05:44:49 +0000459 def test_array_writes(self):
460 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000461 n = len(a.tostring())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 f = self.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000463 self.assertEqual(f.write(a), n)
464 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 f = self.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000466 self.assertEqual(f.write(a), n)
467 f.close()
468
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000469 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000471 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_read_closed(self):
474 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000475 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 with self.open(support.TESTFN, "r") as f:
477 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000478 self.assertEqual(file.read(), "egg\n")
479 file.seek(0)
480 file.close()
481 self.assertRaises(ValueError, file.read)
482
483 def test_no_closefd_with_filename(self):
484 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486
487 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000489 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000491 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000492 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000493 self.assertEqual(file.buffer.raw.closefd, False)
494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000495 def test_garbage_collection(self):
496 # FileIO objects are collected, and collecting them flushes
497 # all data to disk.
498 f = self.FileIO(support.TESTFN, "wb")
499 f.write(b"abcxxx")
500 f.f = f
501 wr = weakref.ref(f)
502 del f
503 gc.collect()
504 self.assert_(wr() is None, wr)
505 with open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000507
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000508 def test_unbounded_file(self):
509 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
510 zero = "/dev/zero"
511 if not os.path.exists(zero):
512 raise unittest.SkipTest("{0} does not exist".format(zero))
513 if sys.maxsize > 0x7FFFFFFF:
514 raise unittest.SkipTest("test can only run in a 32-bit address space")
515 if support.real_max_memuse < support._2G:
516 raise unittest.SkipTest("test requires at least 2GB of memory")
517 with open(zero, "rb", buffering=0) as f:
518 self.assertRaises(OverflowError, f.read)
519 with open(zero, "rb") as f:
520 self.assertRaises(OverflowError, f.read)
521 with open(zero, "r") as f:
522 self.assertRaises(OverflowError, f.read)
523
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000524class CIOTest(IOTest):
525 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000527class PyIOTest(IOTest):
528 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000529
Guido van Rossuma9e20242007-03-08 00:43:48 +0000530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000531class CommonBufferedTests:
532 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
533
534 def test_fileno(self):
535 rawio = self.MockRawIO()
536 bufio = self.tp(rawio)
537
538 self.assertEquals(42, bufio.fileno())
539
540 def test_no_fileno(self):
541 # XXX will we always have fileno() function? If so, kill
542 # this test. Else, write it.
543 pass
544
545 def test_invalid_args(self):
546 rawio = self.MockRawIO()
547 bufio = self.tp(rawio)
548 # Invalid whence
549 self.assertRaises(ValueError, bufio.seek, 0, -1)
550 self.assertRaises(ValueError, bufio.seek, 0, 3)
551
552 def test_override_destructor(self):
553 tp = self.tp
554 record = []
555 class MyBufferedIO(tp):
556 def __del__(self):
557 record.append(1)
558 try:
559 f = super().__del__
560 except AttributeError:
561 pass
562 else:
563 f()
564 def close(self):
565 record.append(2)
566 super().close()
567 def flush(self):
568 record.append(3)
569 super().flush()
570 rawio = self.MockRawIO()
571 bufio = MyBufferedIO(rawio)
572 writable = bufio.writable()
573 del bufio
574 if writable:
575 self.assertEqual(record, [1, 2, 3])
576 else:
577 self.assertEqual(record, [1, 2])
578
579 def test_context_manager(self):
580 # Test usability as a context manager
581 rawio = self.MockRawIO()
582 bufio = self.tp(rawio)
583 def _with():
584 with bufio:
585 pass
586 _with()
587 # bufio should now be closed, and using it a second time should raise
588 # a ValueError.
589 self.assertRaises(ValueError, _with)
590
591 def test_error_through_destructor(self):
592 # Test that the exception state is not modified by a destructor,
593 # even if close() fails.
594 rawio = self.CloseFailureIO()
595 def f():
596 self.tp(rawio).xyzzy
597 with support.captured_output("stderr") as s:
598 self.assertRaises(AttributeError, f)
599 s = s.getvalue().strip()
600 if s:
601 # The destructor *may* have printed an unraisable error, check it
602 self.assertEqual(len(s.splitlines()), 1)
603 self.assert_(s.startswith("Exception IOError: "), s)
604 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000605
606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
608 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610 def test_constructor(self):
611 rawio = self.MockRawIO([b"abc"])
612 bufio = self.tp(rawio)
613 bufio.__init__(rawio)
614 bufio.__init__(rawio, buffer_size=1024)
615 bufio.__init__(rawio, buffer_size=16)
616 self.assertEquals(b"abc", bufio.read())
617 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
618 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
619 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
620 rawio = self.MockRawIO([b"abc"])
621 bufio.__init__(rawio)
622 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000624 def test_read(self):
625 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
626 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000627 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000628 # Invalid args
629 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 def test_read1(self):
632 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
633 bufio = self.tp(rawio)
634 self.assertEquals(b"a", bufio.read(1))
635 self.assertEquals(b"b", bufio.read1(1))
636 self.assertEquals(rawio._reads, 1)
637 self.assertEquals(b"c", bufio.read1(100))
638 self.assertEquals(rawio._reads, 1)
639 self.assertEquals(b"d", bufio.read1(100))
640 self.assertEquals(rawio._reads, 2)
641 self.assertEquals(b"efg", bufio.read1(100))
642 self.assertEquals(rawio._reads, 3)
643 self.assertEquals(b"", bufio.read1(100))
644 # Invalid args
645 self.assertRaises(ValueError, bufio.read1, -1)
646
647 def test_readinto(self):
648 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
649 bufio = self.tp(rawio)
650 b = bytearray(2)
651 self.assertEquals(bufio.readinto(b), 2)
652 self.assertEquals(b, b"ab")
653 self.assertEquals(bufio.readinto(b), 2)
654 self.assertEquals(b, b"cd")
655 self.assertEquals(bufio.readinto(b), 2)
656 self.assertEquals(b, b"ef")
657 self.assertEquals(bufio.readinto(b), 1)
658 self.assertEquals(b, b"gf")
659 self.assertEquals(bufio.readinto(b), 0)
660 self.assertEquals(b, b"gf")
661
662 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000663 data = b"abcdefghi"
664 dlen = len(data)
665
666 tests = [
667 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
668 [ 100, [ 3, 3, 3], [ dlen ] ],
669 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
670 ]
671
672 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000673 rawio = self.MockFileIO(data)
674 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000675 pos = 0
676 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000677 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000678 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000679 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000680 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000682 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000683 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
685 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000686
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000687 self.assertEquals(b"abcd", bufio.read(6))
688 self.assertEquals(b"e", bufio.read(1))
689 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000691 self.assert_(None is bufio.read())
692 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_read_past_eof(self):
695 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
696 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000697
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000698 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_read_all(self):
701 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
702 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000703
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000704 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000707 try:
708 # Write out many bytes with exactly the same number of 0's,
709 # 1's... 255's. This will help us check that concurrent reading
710 # doesn't duplicate or forget contents.
711 N = 1000
712 l = list(range(256)) * N
713 random.shuffle(l)
714 s = bytes(bytearray(l))
715 with io.open(support.TESTFN, "wb") as f:
716 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000717 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
718 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000719 errors = []
720 results = []
721 def f():
722 try:
723 # Intra-buffer read then buffer-flushing read
724 for n in cycle([1, 19]):
725 s = bufio.read(n)
726 if not s:
727 break
728 # list.append() is atomic
729 results.append(s)
730 except Exception as e:
731 errors.append(e)
732 raise
733 threads = [threading.Thread(target=f) for x in range(20)]
734 for t in threads:
735 t.start()
736 time.sleep(0.02) # yield
737 for t in threads:
738 t.join()
739 self.assertFalse(errors,
740 "the following exceptions were caught: %r" % errors)
741 s = b''.join(results)
742 for i in range(256):
743 c = bytes(bytearray([i]))
744 self.assertEqual(s.count(c), N)
745 finally:
746 support.unlink(support.TESTFN)
747
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 def test_misbehaved_io(self):
749 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
750 bufio = self.tp(rawio)
751 self.assertRaises(IOError, bufio.seek, 0)
752 self.assertRaises(IOError, bufio.tell)
753
754class CBufferedReaderTest(BufferedReaderTest):
755 tp = io.BufferedReader
756
757 def test_constructor(self):
758 BufferedReaderTest.test_constructor(self)
759 # The allocation can succeed on 32-bit builds, e.g. with more
760 # than 2GB RAM and a 64-bit kernel.
761 if sys.maxsize > 0x7FFFFFFF:
762 rawio = self.MockRawIO()
763 bufio = self.tp(rawio)
764 self.assertRaises((OverflowError, MemoryError, ValueError),
765 bufio.__init__, rawio, sys.maxsize)
766
767 def test_initialization(self):
768 rawio = self.MockRawIO([b"abc"])
769 bufio = self.tp(rawio)
770 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
771 self.assertRaises(ValueError, bufio.read)
772 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
773 self.assertRaises(ValueError, bufio.read)
774 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
775 self.assertRaises(ValueError, bufio.read)
776
777 def test_misbehaved_io_read(self):
778 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
779 bufio = self.tp(rawio)
780 # _pyio.BufferedReader seems to implement reading different, so that
781 # checking this is not so easy.
782 self.assertRaises(IOError, bufio.read, 10)
783
784 def test_garbage_collection(self):
785 # C BufferedReader objects are collected.
786 # The Python version has __del__, so it ends into gc.garbage instead
787 rawio = self.FileIO(support.TESTFN, "w+b")
788 f = self.tp(rawio)
789 f.f = f
790 wr = weakref.ref(f)
791 del f
792 gc.collect()
793 self.assert_(wr() is None, wr)
794
795class PyBufferedReaderTest(BufferedReaderTest):
796 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000797
Guido van Rossuma9e20242007-03-08 00:43:48 +0000798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000799class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
800 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000801
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000802 def test_constructor(self):
803 rawio = self.MockRawIO()
804 bufio = self.tp(rawio)
805 bufio.__init__(rawio)
806 bufio.__init__(rawio, buffer_size=1024)
807 bufio.__init__(rawio, buffer_size=16)
808 self.assertEquals(3, bufio.write(b"abc"))
809 bufio.flush()
810 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
811 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
812 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
813 bufio.__init__(rawio)
814 self.assertEquals(3, bufio.write(b"ghi"))
815 bufio.flush()
816 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
817
818 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000819 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820 writer = self.MockRawIO()
821 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000822 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000823 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000824
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000825 def test_write_overflow(self):
826 writer = self.MockRawIO()
827 bufio = self.tp(writer, 8)
828 contents = b"abcdefghijklmnop"
829 for n in range(0, len(contents), 3):
830 bufio.write(contents[n:n+3])
831 flushed = b"".join(writer._write_stack)
832 # At least (total - 8) bytes were implicitly flushed, perhaps more
833 # depending on the implementation.
834 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000835
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000836 def check_writes(self, intermediate_func):
837 # Lots of writes, test the flushed output is as expected.
838 contents = bytes(range(256)) * 1000
839 n = 0
840 writer = self.MockRawIO()
841 bufio = self.tp(writer, 13)
842 # Generator of write sizes: repeat each N 15 times then proceed to N+1
843 def gen_sizes():
844 for size in count(1):
845 for i in range(15):
846 yield size
847 sizes = gen_sizes()
848 while n < len(contents):
849 size = min(next(sizes), len(contents) - n)
850 self.assertEquals(bufio.write(contents[n:n+size]), size)
851 intermediate_func(bufio)
852 n += size
853 bufio.flush()
854 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000855
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000856 def test_writes(self):
857 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000859 def test_writes_and_flushes(self):
860 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000862 def test_writes_and_seeks(self):
863 def _seekabs(bufio):
864 pos = bufio.tell()
865 bufio.seek(pos + 1, 0)
866 bufio.seek(pos - 1, 0)
867 bufio.seek(pos, 0)
868 self.check_writes(_seekabs)
869 def _seekrel(bufio):
870 pos = bufio.seek(0, 1)
871 bufio.seek(+1, 1)
872 bufio.seek(-1, 1)
873 bufio.seek(pos, 0)
874 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000876 def test_writes_and_truncates(self):
877 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000879 def test_write_non_blocking(self):
880 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000881 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000883 self.assertEquals(bufio.write(b"abcd"), 4)
884 self.assertEquals(bufio.write(b"efghi"), 5)
885 # 1 byte will be written, the rest will be buffered
886 raw.block_on(b"k")
887 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 # 8 bytes will be written, 8 will be buffered and the rest will be lost
890 raw.block_on(b"0")
891 try:
892 bufio.write(b"opqrwxyz0123456789")
893 except self.BlockingIOError as e:
894 written = e.characters_written
895 else:
896 self.fail("BlockingIOError should have been raised")
897 self.assertEquals(written, 16)
898 self.assertEquals(raw.pop_written(),
899 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000901 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
902 s = raw.pop_written()
903 # Previously buffered bytes were flushed
904 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000906 def test_write_and_rewind(self):
907 raw = io.BytesIO()
908 bufio = self.tp(raw, 4)
909 self.assertEqual(bufio.write(b"abcdef"), 6)
910 self.assertEqual(bufio.tell(), 6)
911 bufio.seek(0, 0)
912 self.assertEqual(bufio.write(b"XY"), 2)
913 bufio.seek(6, 0)
914 self.assertEqual(raw.getvalue(), b"XYcdef")
915 self.assertEqual(bufio.write(b"123456"), 6)
916 bufio.flush()
917 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000919 def test_flush(self):
920 writer = self.MockRawIO()
921 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000922 bufio.write(b"abc")
923 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000924 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000926 def test_destructor(self):
927 writer = self.MockRawIO()
928 bufio = self.tp(writer, 8)
929 bufio.write(b"abc")
930 del bufio
931 self.assertEquals(b"abc", writer._write_stack[0])
932
933 def test_truncate(self):
934 # Truncate implicitly flushes the buffer.
935 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
936 bufio = self.tp(raw, 8)
937 bufio.write(b"abcdef")
938 self.assertEqual(bufio.truncate(3), 3)
939 self.assertEqual(bufio.tell(), 3)
940 with io.open(support.TESTFN, "rb", buffering=0) as f:
941 self.assertEqual(f.read(), b"abc")
942
943 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000944 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000945 # Write out many bytes from many threads and test they were
946 # all flushed.
947 N = 1000
948 contents = bytes(range(256)) * N
949 sizes = cycle([1, 19])
950 n = 0
951 queue = deque()
952 while n < len(contents):
953 size = next(sizes)
954 queue.append(contents[n:n+size])
955 n += size
956 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000957 # We use a real file object because it allows us to
958 # exercise situations where the GIL is released before
959 # writing the buffer to the raw streams. This is in addition
960 # to concurrency issues due to switching threads in the middle
961 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000962 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
963 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000964 errors = []
965 def f():
966 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000967 while True:
968 try:
969 s = queue.popleft()
970 except IndexError:
971 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000972 bufio.write(s)
973 except Exception as e:
974 errors.append(e)
975 raise
976 threads = [threading.Thread(target=f) for x in range(20)]
977 for t in threads:
978 t.start()
979 time.sleep(0.02) # yield
980 for t in threads:
981 t.join()
982 self.assertFalse(errors,
983 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000984 bufio.close()
985 with io.open(support.TESTFN, "rb") as f:
986 s = f.read()
987 for i in range(256):
988 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000989 finally:
990 support.unlink(support.TESTFN)
991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000992 def test_misbehaved_io(self):
993 rawio = self.MisbehavedRawIO()
994 bufio = self.tp(rawio, 5)
995 self.assertRaises(IOError, bufio.seek, 0)
996 self.assertRaises(IOError, bufio.tell)
997 self.assertRaises(IOError, bufio.write, b"abcdef")
998
Benjamin Peterson59406a92009-03-26 17:10:29 +0000999 def test_max_buffer_size_deprecation(self):
1000 with support.check_warnings() as w:
1001 warnings.simplefilter("always", DeprecationWarning)
1002 self.tp(self.MockRawIO(), 8, 12)
1003 self.assertEqual(len(w.warnings), 1)
1004 warning = w.warnings[0]
1005 self.assertTrue(warning.category is DeprecationWarning)
1006 self.assertEqual(str(warning.message),
1007 "max_buffer_size is deprecated")
1008
1009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001010class CBufferedWriterTest(BufferedWriterTest):
1011 tp = io.BufferedWriter
1012
1013 def test_constructor(self):
1014 BufferedWriterTest.test_constructor(self)
1015 # The allocation can succeed on 32-bit builds, e.g. with more
1016 # than 2GB RAM and a 64-bit kernel.
1017 if sys.maxsize > 0x7FFFFFFF:
1018 rawio = self.MockRawIO()
1019 bufio = self.tp(rawio)
1020 self.assertRaises((OverflowError, MemoryError, ValueError),
1021 bufio.__init__, rawio, sys.maxsize)
1022
1023 def test_initialization(self):
1024 rawio = self.MockRawIO()
1025 bufio = self.tp(rawio)
1026 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1027 self.assertRaises(ValueError, bufio.write, b"def")
1028 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1029 self.assertRaises(ValueError, bufio.write, b"def")
1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1031 self.assertRaises(ValueError, bufio.write, b"def")
1032
1033 def test_garbage_collection(self):
1034 # C BufferedWriter objects are collected, and collecting them flushes
1035 # all data to disk.
1036 # The Python version has __del__, so it ends into gc.garbage instead
1037 rawio = self.FileIO(support.TESTFN, "w+b")
1038 f = self.tp(rawio)
1039 f.write(b"123xxx")
1040 f.x = f
1041 wr = weakref.ref(f)
1042 del f
1043 gc.collect()
1044 self.assert_(wr() is None, wr)
1045 with open(support.TESTFN, "rb") as f:
1046 self.assertEqual(f.read(), b"123xxx")
1047
1048
1049class PyBufferedWriterTest(BufferedWriterTest):
1050 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001051
Guido van Rossum01a27522007-03-07 01:00:12 +00001052class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_basic(self):
1055 r = self.MockRawIO(())
1056 w = self.MockRawIO()
1057 pair = self.tp(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +00001058 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001059
Benjamin Peterson59406a92009-03-26 17:10:29 +00001060 def test_max_buffer_size_deprecation(self):
1061 with support.check_warnings() as w:
1062 warnings.simplefilter("always", DeprecationWarning)
1063 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1064 self.assertEqual(len(w.warnings), 1)
1065 warning = w.warnings[0]
1066 self.assertTrue(warning.category is DeprecationWarning)
1067 self.assertEqual(str(warning.message),
1068 "max_buffer_size is deprecated")
1069
Benjamin Peterson92035012008-12-27 16:00:54 +00001070 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +00001071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072class CBufferedRWPairTest(BufferedRWPairTest):
1073 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075class PyBufferedRWPairTest(BufferedRWPairTest):
1076 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001077
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078
1079class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1080 read_mode = "rb+"
1081 write_mode = "wb+"
1082
1083 def test_constructor(self):
1084 BufferedReaderTest.test_constructor(self)
1085 BufferedWriterTest.test_constructor(self)
1086
1087 def test_read_and_write(self):
1088 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001089 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001090
1091 self.assertEqual(b"as", rw.read(2))
1092 rw.write(b"ddd")
1093 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001094 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001096 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001097
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001098 def test_seek_and_tell(self):
1099 raw = self.BytesIO(b"asdfghjkl")
1100 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001101
1102 self.assertEquals(b"as", rw.read(2))
1103 self.assertEquals(2, rw.tell())
1104 rw.seek(0, 0)
1105 self.assertEquals(b"asdf", rw.read(4))
1106
1107 rw.write(b"asdf")
1108 rw.seek(0, 0)
1109 self.assertEquals(b"asdfasdfl", rw.read())
1110 self.assertEquals(9, rw.tell())
1111 rw.seek(-4, 2)
1112 self.assertEquals(5, rw.tell())
1113 rw.seek(2, 1)
1114 self.assertEquals(7, rw.tell())
1115 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001116 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001117
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001118 def check_flush_and_read(self, read_func):
1119 raw = self.BytesIO(b"abcdefghi")
1120 bufio = self.tp(raw)
1121
1122 self.assertEquals(b"ab", read_func(bufio, 2))
1123 bufio.write(b"12")
1124 self.assertEquals(b"ef", read_func(bufio, 2))
1125 self.assertEquals(6, bufio.tell())
1126 bufio.flush()
1127 self.assertEquals(6, bufio.tell())
1128 self.assertEquals(b"ghi", read_func(bufio))
1129 raw.seek(0, 0)
1130 raw.write(b"XYZ")
1131 # flush() resets the read buffer
1132 bufio.flush()
1133 bufio.seek(0, 0)
1134 self.assertEquals(b"XYZ", read_func(bufio, 3))
1135
1136 def test_flush_and_read(self):
1137 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1138
1139 def test_flush_and_readinto(self):
1140 def _readinto(bufio, n=-1):
1141 b = bytearray(n if n >= 0 else 9999)
1142 n = bufio.readinto(b)
1143 return bytes(b[:n])
1144 self.check_flush_and_read(_readinto)
1145
1146 def test_flush_and_peek(self):
1147 def _peek(bufio, n=-1):
1148 # This relies on the fact that the buffer can contain the whole
1149 # raw stream, otherwise peek() can return less.
1150 b = bufio.peek(n)
1151 if n != -1:
1152 b = b[:n]
1153 bufio.seek(len(b), 1)
1154 return b
1155 self.check_flush_and_read(_peek)
1156
1157 def test_flush_and_write(self):
1158 raw = self.BytesIO(b"abcdefghi")
1159 bufio = self.tp(raw)
1160
1161 bufio.write(b"123")
1162 bufio.flush()
1163 bufio.write(b"45")
1164 bufio.flush()
1165 bufio.seek(0, 0)
1166 self.assertEquals(b"12345fghi", raw.getvalue())
1167 self.assertEquals(b"12345fghi", bufio.read())
1168
1169 def test_threads(self):
1170 BufferedReaderTest.test_threads(self)
1171 BufferedWriterTest.test_threads(self)
1172
1173 def test_writes_and_peek(self):
1174 def _peek(bufio):
1175 bufio.peek(1)
1176 self.check_writes(_peek)
1177 def _peek(bufio):
1178 pos = bufio.tell()
1179 bufio.seek(-1, 1)
1180 bufio.peek(1)
1181 bufio.seek(pos, 0)
1182 self.check_writes(_peek)
1183
1184 def test_writes_and_reads(self):
1185 def _read(bufio):
1186 bufio.seek(-1, 1)
1187 bufio.read(1)
1188 self.check_writes(_read)
1189
1190 def test_writes_and_read1s(self):
1191 def _read1(bufio):
1192 bufio.seek(-1, 1)
1193 bufio.read1(1)
1194 self.check_writes(_read1)
1195
1196 def test_writes_and_readintos(self):
1197 def _read(bufio):
1198 bufio.seek(-1, 1)
1199 bufio.readinto(bytearray(1))
1200 self.check_writes(_read)
1201
1202 def test_misbehaved_io(self):
1203 BufferedReaderTest.test_misbehaved_io(self)
1204 BufferedWriterTest.test_misbehaved_io(self)
1205
1206class CBufferedRandomTest(BufferedRandomTest):
1207 tp = io.BufferedRandom
1208
1209 def test_constructor(self):
1210 BufferedRandomTest.test_constructor(self)
1211 # The allocation can succeed on 32-bit builds, e.g. with more
1212 # than 2GB RAM and a 64-bit kernel.
1213 if sys.maxsize > 0x7FFFFFFF:
1214 rawio = self.MockRawIO()
1215 bufio = self.tp(rawio)
1216 self.assertRaises((OverflowError, MemoryError, ValueError),
1217 bufio.__init__, rawio, sys.maxsize)
1218
1219 def test_garbage_collection(self):
1220 CBufferedReaderTest.test_garbage_collection(self)
1221 CBufferedWriterTest.test_garbage_collection(self)
1222
1223class PyBufferedRandomTest(BufferedRandomTest):
1224 tp = pyio.BufferedRandom
1225
1226
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001227# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1228# properties:
1229# - A single output character can correspond to many bytes of input.
1230# - The number of input bytes to complete the character can be
1231# undetermined until the last input byte is received.
1232# - The number of input bytes can vary depending on previous input.
1233# - A single input byte can correspond to many characters of output.
1234# - The number of output characters can be undetermined until the
1235# last input byte is received.
1236# - The number of output characters can vary depending on previous input.
1237
1238class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1239 """
1240 For testing seek/tell behavior with a stateful, buffering decoder.
1241
1242 Input is a sequence of words. Words may be fixed-length (length set
1243 by input) or variable-length (period-terminated). In variable-length
1244 mode, extra periods are ignored. Possible words are:
1245 - 'i' followed by a number sets the input length, I (maximum 99).
1246 When I is set to 0, words are space-terminated.
1247 - 'o' followed by a number sets the output length, O (maximum 99).
1248 - Any other word is converted into a word followed by a period on
1249 the output. The output word consists of the input word truncated
1250 or padded out with hyphens to make its length equal to O. If O
1251 is 0, the word is output verbatim without truncating or padding.
1252 I and O are initially set to 1. When I changes, any buffered input is
1253 re-scanned according to the new I. EOF also terminates the last word.
1254 """
1255
1256 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001257 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001258 self.reset()
1259
1260 def __repr__(self):
1261 return '<SID %x>' % id(self)
1262
1263 def reset(self):
1264 self.i = 1
1265 self.o = 1
1266 self.buffer = bytearray()
1267
1268 def getstate(self):
1269 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1270 return bytes(self.buffer), i*100 + o
1271
1272 def setstate(self, state):
1273 buffer, io = state
1274 self.buffer = bytearray(buffer)
1275 i, o = divmod(io, 100)
1276 self.i, self.o = i ^ 1, o ^ 1
1277
1278 def decode(self, input, final=False):
1279 output = ''
1280 for b in input:
1281 if self.i == 0: # variable-length, terminated with period
1282 if b == ord('.'):
1283 if self.buffer:
1284 output += self.process_word()
1285 else:
1286 self.buffer.append(b)
1287 else: # fixed-length, terminate after self.i bytes
1288 self.buffer.append(b)
1289 if len(self.buffer) == self.i:
1290 output += self.process_word()
1291 if final and self.buffer: # EOF terminates the last word
1292 output += self.process_word()
1293 return output
1294
1295 def process_word(self):
1296 output = ''
1297 if self.buffer[0] == ord('i'):
1298 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1299 elif self.buffer[0] == ord('o'):
1300 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1301 else:
1302 output = self.buffer.decode('ascii')
1303 if len(output) < self.o:
1304 output += '-'*self.o # pad out with hyphens
1305 if self.o:
1306 output = output[:self.o] # truncate to output length
1307 output += '.'
1308 self.buffer = bytearray()
1309 return output
1310
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001311 codecEnabled = False
1312
1313 @classmethod
1314 def lookupTestDecoder(cls, name):
1315 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001316 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001317 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001318 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001319 incrementalencoder=None,
1320 streamreader=None, streamwriter=None,
1321 incrementaldecoder=cls)
1322
1323# Register the previous decoder for testing.
1324# Disabled by default, tests will enable it.
1325codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1326
1327
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001328class StatefulIncrementalDecoderTest(unittest.TestCase):
1329 """
1330 Make sure the StatefulIncrementalDecoder actually works.
1331 """
1332
1333 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001334 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001335 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001336 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001337 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001338 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001339 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001340 # I=0, O=6 (variable-length input, fixed-length output)
1341 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1342 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001343 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001344 # I=6, O=3 (fixed-length input > fixed-length output)
1345 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1346 # I=0, then 3; O=29, then 15 (with longer output)
1347 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1348 'a----------------------------.' +
1349 'b----------------------------.' +
1350 'cde--------------------------.' +
1351 'abcdefghijabcde.' +
1352 'a.b------------.' +
1353 '.c.------------.' +
1354 'd.e------------.' +
1355 'k--------------.' +
1356 'l--------------.' +
1357 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001358 ]
1359
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001360 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001361 # Try a few one-shot test cases.
1362 for input, eof, output in self.test_cases:
1363 d = StatefulIncrementalDecoder()
1364 self.assertEquals(d.decode(input, eof), output)
1365
1366 # Also test an unfinished decode, followed by forcing EOF.
1367 d = StatefulIncrementalDecoder()
1368 self.assertEquals(d.decode(b'oiabcd'), '')
1369 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001370
1371class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001372
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001373 def setUp(self):
1374 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1375 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001376 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001377
Guido van Rossumd0712812007-04-11 16:32:43 +00001378 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001379 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001380
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001381 def test_constructor(self):
1382 r = self.BytesIO(b"\xc3\xa9\n\n")
1383 b = self.BufferedReader(r, 1000)
1384 t = self.TextIOWrapper(b)
1385 t.__init__(b, encoding="latin1", newline="\r\n")
1386 self.assertEquals(t.encoding, "latin1")
1387 self.assertEquals(t.line_buffering, False)
1388 t.__init__(b, encoding="utf8", line_buffering=True)
1389 self.assertEquals(t.encoding, "utf8")
1390 self.assertEquals(t.line_buffering, True)
1391 self.assertEquals("\xe9\n", t.readline())
1392 self.assertRaises(TypeError, t.__init__, b, newline=42)
1393 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1394
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001395 def test_repr(self):
1396 raw = self.BytesIO("hello".encode("utf-8"))
1397 b = self.BufferedReader(raw)
1398 t = self.TextIOWrapper(b, encoding="utf-8")
1399 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001401 def test_line_buffering(self):
1402 r = self.BytesIO()
1403 b = self.BufferedWriter(r, 1000)
1404 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001405 t.write("X")
1406 self.assertEquals(r.getvalue(), b"") # No flush happened
1407 t.write("Y\nZ")
1408 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1409 t.write("A\rB")
1410 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412 def test_encoding(self):
1413 # Check the encoding attribute is always set, and valid
1414 b = self.BytesIO()
1415 t = self.TextIOWrapper(b, encoding="utf8")
1416 self.assertEqual(t.encoding, "utf8")
1417 t = self.TextIOWrapper(b)
1418 self.assert_(t.encoding is not None)
1419 codecs.lookup(t.encoding)
1420
1421 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001422 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 b = self.BytesIO(b"abc\n\xff\n")
1424 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001425 self.assertRaises(UnicodeError, t.read)
1426 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427 b = self.BytesIO(b"abc\n\xff\n")
1428 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001429 self.assertRaises(UnicodeError, t.read)
1430 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 b = self.BytesIO(b"abc\n\xff\n")
1432 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001433 self.assertEquals(t.read(), "abc\n\n")
1434 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 b = self.BytesIO(b"abc\n\xff\n")
1436 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001437 self.assertEquals(t.read(), "abc\n\ufffd\n")
1438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001439 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001440 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001441 b = self.BytesIO()
1442 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001443 self.assertRaises(UnicodeError, t.write, "\xff")
1444 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 b = self.BytesIO()
1446 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001447 self.assertRaises(UnicodeError, t.write, "\xff")
1448 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449 b = self.BytesIO()
1450 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001451 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001452 t.write("abc\xffdef\n")
1453 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001454 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001455 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001456 b = self.BytesIO()
1457 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001458 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001459 t.write("abc\xffdef\n")
1460 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001461 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001462
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001464 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1465
1466 tests = [
1467 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001468 [ '', input_lines ],
1469 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1470 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1471 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001472 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001473 encodings = (
1474 'utf-8', 'latin-1',
1475 'utf-16', 'utf-16-le', 'utf-16-be',
1476 'utf-32', 'utf-32-le', 'utf-32-be',
1477 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001478
Guido van Rossum8358db22007-08-18 21:39:55 +00001479 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001480 # character in TextIOWrapper._pending_line.
1481 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001482 # XXX: str.encode() should return bytes
1483 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001484 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001485 for bufsize in range(1, 10):
1486 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1488 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001489 encoding=encoding)
1490 if do_reads:
1491 got_lines = []
1492 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001493 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001494 if c2 == '':
1495 break
1496 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001497 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001498 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001499 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001500
1501 for got_line, exp_line in zip(got_lines, exp_lines):
1502 self.assertEquals(got_line, exp_line)
1503 self.assertEquals(len(got_lines), len(exp_lines))
1504
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505 def test_newlines_input(self):
1506 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001507 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1508 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001509 (None, normalized.decode("ascii").splitlines(True)),
1510 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001511 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1512 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1513 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001514 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001515 buf = self.BytesIO(testdata)
1516 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001517 self.assertEquals(txt.readlines(), expected)
1518 txt.seek(0)
1519 self.assertEquals(txt.read(), "".join(expected))
1520
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001521 def test_newlines_output(self):
1522 testdict = {
1523 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1524 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1525 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1526 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1527 }
1528 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1529 for newline, expected in tests:
1530 buf = self.BytesIO()
1531 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1532 txt.write("AAA\nB")
1533 txt.write("BB\nCCC\n")
1534 txt.write("X\rY\r\nZ")
1535 txt.flush()
1536 self.assertEquals(buf.closed, False)
1537 self.assertEquals(buf.getvalue(), expected)
1538
1539 def test_destructor(self):
1540 l = []
1541 base = self.BytesIO
1542 class MyBytesIO(base):
1543 def close(self):
1544 l.append(self.getvalue())
1545 base.close(self)
1546 b = MyBytesIO()
1547 t = self.TextIOWrapper(b, encoding="ascii")
1548 t.write("abc")
1549 del t
1550 self.assertEquals([b"abc"], l)
1551
1552 def test_override_destructor(self):
1553 record = []
1554 class MyTextIO(self.TextIOWrapper):
1555 def __del__(self):
1556 record.append(1)
1557 try:
1558 f = super().__del__
1559 except AttributeError:
1560 pass
1561 else:
1562 f()
1563 def close(self):
1564 record.append(2)
1565 super().close()
1566 def flush(self):
1567 record.append(3)
1568 super().flush()
1569 b = self.BytesIO()
1570 t = MyTextIO(b, encoding="ascii")
1571 del t
1572 self.assertEqual(record, [1, 2, 3])
1573
1574 def test_error_through_destructor(self):
1575 # Test that the exception state is not modified by a destructor,
1576 # even if close() fails.
1577 rawio = self.CloseFailureIO()
1578 def f():
1579 self.TextIOWrapper(rawio).xyzzy
1580 with support.captured_output("stderr") as s:
1581 self.assertRaises(AttributeError, f)
1582 s = s.getvalue().strip()
1583 if s:
1584 # The destructor *may* have printed an unraisable error, check it
1585 self.assertEqual(len(s.splitlines()), 1)
1586 self.assert_(s.startswith("Exception IOError: "), s)
1587 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001588
Guido van Rossum9b76da62007-04-11 01:09:03 +00001589 # Systematic tests of the text I/O API
1590
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001592 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1593 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001594 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001595 f._CHUNK_SIZE = chunksize
1596 self.assertEquals(f.write("abc"), 3)
1597 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001599 f._CHUNK_SIZE = chunksize
1600 self.assertEquals(f.tell(), 0)
1601 self.assertEquals(f.read(), "abc")
1602 cookie = f.tell()
1603 self.assertEquals(f.seek(0), 0)
1604 self.assertEquals(f.read(2), "ab")
1605 self.assertEquals(f.read(1), "c")
1606 self.assertEquals(f.read(1), "")
1607 self.assertEquals(f.read(), "")
1608 self.assertEquals(f.tell(), cookie)
1609 self.assertEquals(f.seek(0), 0)
1610 self.assertEquals(f.seek(0, 2), cookie)
1611 self.assertEquals(f.write("def"), 3)
1612 self.assertEquals(f.seek(cookie), cookie)
1613 self.assertEquals(f.read(), "def")
1614 if enc.startswith("utf"):
1615 self.multi_line_test(f, enc)
1616 f.close()
1617
1618 def multi_line_test(self, f, enc):
1619 f.seek(0)
1620 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001621 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001622 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001623 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001624 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001625 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001626 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001627 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001628 wlines.append((f.tell(), line))
1629 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001630 f.seek(0)
1631 rlines = []
1632 while True:
1633 pos = f.tell()
1634 line = f.readline()
1635 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001636 break
1637 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001638 self.assertEquals(rlines, wlines)
1639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def test_telling(self):
1641 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001642 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001643 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001644 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001645 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001646 p2 = f.tell()
1647 f.seek(0)
1648 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001649 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001650 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001651 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001652 self.assertEquals(f.tell(), p2)
1653 f.seek(0)
1654 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001655 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001656 self.assertRaises(IOError, f.tell)
1657 self.assertEquals(f.tell(), p2)
1658 f.close()
1659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 def test_seeking(self):
1661 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001662 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001663 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001664 prefix = bytes(u_prefix.encode("utf-8"))
1665 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001666 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001667 suffix = bytes(u_suffix.encode("utf-8"))
1668 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001670 f.write(line*2)
1671 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001672 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001673 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001674 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001675 self.assertEquals(f.tell(), prefix_size)
1676 self.assertEquals(f.readline(), u_suffix)
1677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001679 # Regression test for a specific bug
1680 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001682 f.write(data)
1683 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001684 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001685 f._CHUNK_SIZE # Just test that it exists
1686 f._CHUNK_SIZE = 2
1687 f.readline()
1688 f.tell()
1689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001690 def test_seek_and_tell(self):
1691 #Test seek/tell using the StatefulIncrementalDecoder.
1692 # Make test faster by doing smaller seeks
1693 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001694
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001695 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001696 """Tell/seek to various points within a data stream and ensure
1697 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001698 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001699 f.write(data)
1700 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 f = self.open(support.TESTFN, encoding='test_decoder')
1702 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001703 decoded = f.read()
1704 f.close()
1705
Neal Norwitze2b07052008-03-18 19:52:05 +00001706 for i in range(min_pos, len(decoded) + 1): # seek positions
1707 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001709 self.assertEquals(f.read(i), decoded[:i])
1710 cookie = f.tell()
1711 self.assertEquals(f.read(j), decoded[i:i + j])
1712 f.seek(cookie)
1713 self.assertEquals(f.read(), decoded[i:])
1714 f.close()
1715
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001716 # Enable the test decoder.
1717 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001718
1719 # Run the tests.
1720 try:
1721 # Try each test case.
1722 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001723 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001724
1725 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001726 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1727 offset = CHUNK_SIZE - len(input)//2
1728 prefix = b'.'*offset
1729 # Don't bother seeking into the prefix (takes too long).
1730 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001731 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001732
1733 # Ensure our test decoder won't interfere with subsequent tests.
1734 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001735 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001738 data = "1234567890"
1739 tests = ("utf-16",
1740 "utf-16-le",
1741 "utf-16-be",
1742 "utf-32",
1743 "utf-32-le",
1744 "utf-32-be")
1745 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001746 buf = self.BytesIO()
1747 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001748 # Check if the BOM is written only once (see issue1753).
1749 f.write(data)
1750 f.write(data)
1751 f.seek(0)
1752 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001753 f.seek(0)
1754 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001755 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1756
Benjamin Petersona1b49012009-03-31 23:11:32 +00001757 def test_unreadable(self):
1758 class UnReadable(self.BytesIO):
1759 def readable(self):
1760 return False
1761 txt = self.TextIOWrapper(UnReadable())
1762 self.assertRaises(IOError, txt.read)
1763
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 def test_read_one_by_one(self):
1765 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001766 reads = ""
1767 while True:
1768 c = txt.read(1)
1769 if not c:
1770 break
1771 reads += c
1772 self.assertEquals(reads, "AA\nBB")
1773
1774 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001776 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001777 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001778 reads = ""
1779 while True:
1780 c = txt.read(128)
1781 if not c:
1782 break
1783 reads += c
1784 self.assertEquals(reads, "A"*127+"\nB")
1785
1786 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001788
1789 # read one char at a time
1790 reads = ""
1791 while True:
1792 c = txt.read(1)
1793 if not c:
1794 break
1795 reads += c
1796 self.assertEquals(reads, self.normalized)
1797
1798 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001800 txt._CHUNK_SIZE = 4
1801
1802 reads = ""
1803 while True:
1804 c = txt.read(4)
1805 if not c:
1806 break
1807 reads += c
1808 self.assertEquals(reads, self.normalized)
1809
1810 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001812 txt._CHUNK_SIZE = 4
1813
1814 reads = txt.read(4)
1815 reads += txt.read(4)
1816 reads += txt.readline()
1817 reads += txt.readline()
1818 reads += txt.readline()
1819 self.assertEquals(reads, self.normalized)
1820
1821 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001823 txt._CHUNK_SIZE = 4
1824
1825 reads = txt.read(4)
1826 reads += txt.read()
1827 self.assertEquals(reads, self.normalized)
1828
1829 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001831 txt._CHUNK_SIZE = 4
1832
1833 reads = txt.read(4)
1834 pos = txt.tell()
1835 txt.seek(0)
1836 txt.seek(pos)
1837 self.assertEquals(txt.read(4), "BBB\n")
1838
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001839 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001840 buffer = self.BytesIO(self.testdata)
1841 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001842
1843 self.assertEqual(buffer.seekable(), txt.seekable())
1844
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845class CTextIOWrapperTest(TextIOWrapperTest):
1846
1847 def test_initialization(self):
1848 r = self.BytesIO(b"\xc3\xa9\n\n")
1849 b = self.BufferedReader(r, 1000)
1850 t = self.TextIOWrapper(b)
1851 self.assertRaises(TypeError, t.__init__, b, newline=42)
1852 self.assertRaises(ValueError, t.read)
1853 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1854 self.assertRaises(ValueError, t.read)
1855
1856 def test_garbage_collection(self):
1857 # C TextIOWrapper objects are collected, and collecting them flushes
1858 # all data to disk.
1859 # The Python version has __del__, so it ends in gc.garbage instead.
1860 rawio = io.FileIO(support.TESTFN, "wb")
1861 b = self.BufferedWriter(rawio)
1862 t = self.TextIOWrapper(b, encoding="ascii")
1863 t.write("456def")
1864 t.x = t
1865 wr = weakref.ref(t)
1866 del t
1867 gc.collect()
1868 self.assert_(wr() is None, wr)
1869 with open(support.TESTFN, "rb") as f:
1870 self.assertEqual(f.read(), b"456def")
1871
1872class PyTextIOWrapperTest(TextIOWrapperTest):
1873 pass
1874
1875
1876class IncrementalNewlineDecoderTest(unittest.TestCase):
1877
1878 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001879 # UTF-8 specific tests for a newline decoder
1880 def _check_decode(b, s, **kwargs):
1881 # We exercise getstate() / setstate() as well as decode()
1882 state = decoder.getstate()
1883 self.assertEquals(decoder.decode(b, **kwargs), s)
1884 decoder.setstate(state)
1885 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001886
Antoine Pitrou180a3362008-12-14 16:36:46 +00001887 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001888
Antoine Pitrou180a3362008-12-14 16:36:46 +00001889 _check_decode(b'\xe8', "")
1890 _check_decode(b'\xa2', "")
1891 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001892
Antoine Pitrou180a3362008-12-14 16:36:46 +00001893 _check_decode(b'\xe8', "")
1894 _check_decode(b'\xa2', "")
1895 _check_decode(b'\x88', "\u8888")
1896
1897 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001898 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1899
Antoine Pitrou180a3362008-12-14 16:36:46 +00001900 decoder.reset()
1901 _check_decode(b'\n', "\n")
1902 _check_decode(b'\r', "")
1903 _check_decode(b'', "\n", final=True)
1904 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001905
Antoine Pitrou180a3362008-12-14 16:36:46 +00001906 _check_decode(b'\r', "")
1907 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001908
Antoine Pitrou180a3362008-12-14 16:36:46 +00001909 _check_decode(b'\r\r\n', "\n\n")
1910 _check_decode(b'\r', "")
1911 _check_decode(b'\r', "\n")
1912 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001913
Antoine Pitrou180a3362008-12-14 16:36:46 +00001914 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1915 _check_decode(b'\xe8\xa2\x88', "\u8888")
1916 _check_decode(b'\n', "\n")
1917 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1918 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001921 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001922 if encoding is not None:
1923 encoder = codecs.getincrementalencoder(encoding)()
1924 def _decode_bytewise(s):
1925 # Decode one byte at a time
1926 for b in encoder.encode(s):
1927 result.append(decoder.decode(bytes([b])))
1928 else:
1929 encoder = None
1930 def _decode_bytewise(s):
1931 # Decode one char at a time
1932 for c in s:
1933 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00001934 self.assertEquals(decoder.newlines, None)
1935 _decode_bytewise("abc\n\r")
1936 self.assertEquals(decoder.newlines, '\n')
1937 _decode_bytewise("\nabc")
1938 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1939 _decode_bytewise("abc\r")
1940 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1941 _decode_bytewise("abc")
1942 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1943 _decode_bytewise("abc\r")
1944 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1945 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001946 input = "abc"
1947 if encoder is not None:
1948 encoder.reset()
1949 input = encoder.encode(input)
1950 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00001951 self.assertEquals(decoder.newlines, None)
1952
1953 def test_newline_decoder(self):
1954 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 # None meaning the IncrementalNewlineDecoder takes unicode input
1956 # rather than bytes input
1957 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00001958 'utf-16', 'utf-16-le', 'utf-16-be',
1959 'utf-32', 'utf-32-le', 'utf-32-be',
1960 )
1961 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962 decoder = enc and codecs.getincrementaldecoder(enc)()
1963 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1964 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001965 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1967 self.check_newline_decoding_utf8(decoder)
1968
Antoine Pitrou66913e22009-03-06 23:40:56 +00001969 def test_newline_bytes(self):
1970 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
1971 def _check(dec):
1972 self.assertEquals(dec.newlines, None)
1973 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
1974 self.assertEquals(dec.newlines, None)
1975 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
1976 self.assertEquals(dec.newlines, None)
1977 dec = self.IncrementalNewlineDecoder(None, translate=False)
1978 _check(dec)
1979 dec = self.IncrementalNewlineDecoder(None, translate=True)
1980 _check(dec)
1981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1983 pass
1984
1985class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1986 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00001987
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001988
Guido van Rossum01a27522007-03-07 01:00:12 +00001989# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001990
Guido van Rossum5abbf752007-08-27 17:39:33 +00001991class MiscIOTest(unittest.TestCase):
1992
Barry Warsaw40e82462008-11-20 20:14:50 +00001993 def tearDown(self):
1994 support.unlink(support.TESTFN)
1995
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001996 def test___all__(self):
1997 for name in self.io.__all__:
1998 obj = getattr(self.io, name, None)
Guido van Rossum5abbf752007-08-27 17:39:33 +00001999 self.assert_(obj is not None, name)
2000 if name == "open":
2001 continue
2002 elif "error" in name.lower():
2003 self.assert_(issubclass(obj, Exception), name)
2004 else:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002005 self.assert_(issubclass(obj, self.IOBase), name)
Benjamin Peterson65676e42008-11-05 21:42:45 +00002006
Barry Warsaw40e82462008-11-20 20:14:50 +00002007 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002008 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002009 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002010 f.close()
2011
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002013 self.assertEquals(f.name, support.TESTFN)
2014 self.assertEquals(f.buffer.name, support.TESTFN)
2015 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2016 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002017 self.assertEquals(f.buffer.mode, "rb")
2018 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002019 f.close()
2020
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002021 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002022 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002023 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2024 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002026 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002027 self.assertEquals(g.mode, "wb")
2028 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002029 self.assertEquals(g.name, f.fileno())
2030 self.assertEquals(g.raw.name, f.fileno())
2031 f.close()
2032 g.close()
2033
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002034 def test_io_after_close(self):
2035 for kwargs in [
2036 {"mode": "w"},
2037 {"mode": "wb"},
2038 {"mode": "w", "buffering": 1},
2039 {"mode": "w", "buffering": 2},
2040 {"mode": "wb", "buffering": 0},
2041 {"mode": "r"},
2042 {"mode": "rb"},
2043 {"mode": "r", "buffering": 1},
2044 {"mode": "r", "buffering": 2},
2045 {"mode": "rb", "buffering": 0},
2046 {"mode": "w+"},
2047 {"mode": "w+b"},
2048 {"mode": "w+", "buffering": 1},
2049 {"mode": "w+", "buffering": 2},
2050 {"mode": "w+b", "buffering": 0},
2051 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002053 f.close()
2054 self.assertRaises(ValueError, f.flush)
2055 self.assertRaises(ValueError, f.fileno)
2056 self.assertRaises(ValueError, f.isatty)
2057 self.assertRaises(ValueError, f.__iter__)
2058 if hasattr(f, "peek"):
2059 self.assertRaises(ValueError, f.peek, 1)
2060 self.assertRaises(ValueError, f.read)
2061 if hasattr(f, "read1"):
2062 self.assertRaises(ValueError, f.read1, 1024)
2063 if hasattr(f, "readinto"):
2064 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2065 self.assertRaises(ValueError, f.readline)
2066 self.assertRaises(ValueError, f.readlines)
2067 self.assertRaises(ValueError, f.seek, 0)
2068 self.assertRaises(ValueError, f.tell)
2069 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002070 self.assertRaises(ValueError, f.write,
2071 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002072 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002073 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002075 def test_blockingioerror(self):
2076 # Various BlockingIOError issues
2077 self.assertRaises(TypeError, self.BlockingIOError)
2078 self.assertRaises(TypeError, self.BlockingIOError, 1)
2079 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2080 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2081 b = self.BlockingIOError(1, "")
2082 self.assertEqual(b.characters_written, 0)
2083 class C(str):
2084 pass
2085 c = C("")
2086 b = self.BlockingIOError(1, c)
2087 c.b = b
2088 b.c = c
2089 wr = weakref.ref(c)
2090 del c, b
2091 gc.collect()
2092 self.assert_(wr() is None, wr)
2093
2094 def test_abcs(self):
2095 # Test the visible base classes are ABCs.
2096 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2097 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2098 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2099 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2100
2101 def _check_abc_inheritance(self, abcmodule):
2102 with self.open(support.TESTFN, "wb", buffering=0) as f:
2103 self.assertTrue(isinstance(f, abcmodule.IOBase))
2104 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2105 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2106 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2107 with self.open(support.TESTFN, "wb") as f:
2108 self.assertTrue(isinstance(f, abcmodule.IOBase))
2109 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2110 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2111 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2112 with self.open(support.TESTFN, "w") as f:
2113 self.assertTrue(isinstance(f, abcmodule.IOBase))
2114 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2115 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2116 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2117
2118 def test_abc_inheritance(self):
2119 # Test implementations inherit from their respective ABCs
2120 self._check_abc_inheritance(self)
2121
2122 def test_abc_inheritance_official(self):
2123 # Test implementations inherit from the official ABCs of the
2124 # baseline "io" module.
2125 self._check_abc_inheritance(io)
2126
2127class CMiscIOTest(MiscIOTest):
2128 io = io
2129
2130class PyMiscIOTest(MiscIOTest):
2131 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002132
Guido van Rossum28524c72007-02-27 05:47:44 +00002133def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002134 tests = (CIOTest, PyIOTest,
2135 CBufferedReaderTest, PyBufferedReaderTest,
2136 CBufferedWriterTest, PyBufferedWriterTest,
2137 CBufferedRWPairTest, PyBufferedRWPairTest,
2138 CBufferedRandomTest, PyBufferedRandomTest,
2139 StatefulIncrementalDecoderTest,
2140 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2141 CTextIOWrapperTest, PyTextIOWrapperTest,
2142 CMiscIOTest, PyMiscIOTest,)
2143
2144 # Put the namespaces of the IO module we are testing and some useful mock
2145 # classes in the __dict__ of each test.
2146 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2147 MockNonBlockWriterIO)
2148 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2149 c_io_ns = {name : getattr(io, name) for name in all_members}
2150 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2151 globs = globals()
2152 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2153 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2154 # Avoid turning open into a bound method.
2155 py_io_ns["open"] = pyio.OpenWrapper
2156 for test in tests:
2157 if test.__name__.startswith("C"):
2158 for name, obj in c_io_ns.items():
2159 setattr(test, name, obj)
2160 elif test.__name__.startswith("Py"):
2161 for name, obj in py_io_ns.items():
2162 setattr(test, name, obj)
2163
2164 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002165
2166if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 test_main()